This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new bff7ad8 IGNITE-13198: Calcite integration. Rework error / cancel
logic at execution. This closes #7981
bff7ad8 is described below
commit bff7ad8d6aa36e1381b9e83be456c47a65d992e1
Author: Taras Ledkov <[email protected]>
AuthorDate: Tue Aug 25 22:00:27 2020 +0300
IGNITE-13198: Calcite integration. Rework error / cancel logic at
execution. This closes #7981
---
.../query/calcite/exec/ExchangeService.java | 27 +-
.../query/calcite/exec/ExchangeServiceImpl.java | 63 ++++-
.../query/calcite/exec/ExecutionContext.java | 18 --
.../query/calcite/exec/ExecutionServiceImpl.java | 289 +++++++--------------
.../processors/query/calcite/exec/IndexScan.java | 256 +++++++++---------
.../query/calcite/exec/LogicalRelImplementor.java | 4 +-
.../query/calcite/exec/MailboxRegistry.java | 12 +-
.../query/calcite/exec/MailboxRegistryImpl.java | 91 +++++--
.../processors/query/calcite/exec/TableScan.java | 251 ------------------
.../query/calcite/exec/rel/AbstractJoinNode.java | 15 ++
.../query/calcite/exec/rel/AbstractNode.java | 23 +-
.../query/calcite/exec/rel/AggregateNode.java | 9 +
.../query/calcite/exec/rel/FilterNode.java | 9 +
.../processors/query/calcite/exec/rel/Inbox.java | 122 ++++++---
.../MessageService.java => exec/rel/Mailbox.java} | 32 ++-
.../query/calcite/exec/rel/ModifyNode.java | 9 +
.../processors/query/calcite/exec/rel/Node.java | 7 +-
.../processors/query/calcite/exec/rel/Outbox.java | 93 ++++---
.../query/calcite/exec/rel/ProjectNode.java | 9 +
.../query/calcite/exec/rel/RootNode.java | 117 +++++----
.../query/calcite/exec/rel/ScanNode.java | 45 +++-
.../query/calcite/exec/rel/SortNode.java | 43 ++-
.../{InboxCancelMessage.java => ErrorMessage.java} | 86 +++---
...oxCancelMessage.java => InboxCloseMessage.java} | 57 ++--
.../query/calcite/message/MessageService.java | 19 ++
.../query/calcite/message/MessageServiceImpl.java | 23 +-
.../query/calcite/message/MessageType.java | 8 +-
...yCancelRequest.java => OutboxCloseMessage.java} | 64 ++++-
.../query/calcite/message/QueryStartRequest.java | 13 +-
.../RemoteException.java} | 52 +++-
.../processors/query/calcite/prepare/Cloner.java | 1 +
.../processors/query/calcite/util/Commons.java | 14 -
.../processors/query/calcite/CancelTest.java | 277 ++++++++++++++++++++
.../processors/query/calcite/PlannerTest.java | 9 +-
.../calcite/exec/rel/AbstractExecutionTest.java | 5 +
.../calcite/exec/rel/ContinuousExecutionTest.java | 2 +-
.../query/calcite/exec/rel/ExecutionTest.java | 36 +--
.../ignite/testsuites/IgniteCalciteTestSuite.java | 4 +-
38 files changed, 1224 insertions(+), 990 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 731023b..ac1f58f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -56,7 +56,30 @@ public interface ExchangeService extends Service {
* @param qryId Query ID.
* @param fragmentId Target fragment ID.
* @param exchangeId Exchange ID.
- * @param batchId Batch ID.
*/
- void cancel(UUID nodeId, UUID qryId, long fragmentId, long exchangeId, int
batchId) throws IgniteCheckedException;
+ void closeInbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId)
throws IgniteCheckedException;
+
+ /**
+ * Sends cancel request.
+ * @param nodeId Target node ID.
+ * @param qryId Query ID.
+ * @param fragmentId Target fragment ID.
+ * @param exchangeId Exchange ID.
+ */
+ void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long
exchangeId) throws IgniteCheckedException;
+
+ /**
+ * @param nodeId Target node ID.
+ * @param qryId Query ID.
+ * @param fragmentId Source fragment ID.
+ * @param err Exception to send.
+ * @throws IgniteCheckedException On error marshaling or send ErrorMessage.
+ */
+ void sendError(UUID nodeId, UUID qryId, long fragmentId, Throwable err)
throws IgniteCheckedException;
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code true} if node is alive, {@code false} otherwise.
+ */
+ boolean alive(UUID nodeId);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 4ab6cb6..fc2879b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
+import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@@ -27,15 +28,18 @@ import org.apache.ignite.internal.GridKernalContext;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
-import
org.apache.ignite.internal.processors.query.calcite.message.InboxCancelMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
+import
org.apache.ignite.internal.processors.query.calcite.message.OutboxCloseMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
import
org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription;
import
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
/**
*
@@ -111,8 +115,18 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
}
/** {@inheritDoc} */
- @Override public void cancel(UUID nodeId, UUID qryId, long fragmentId,
long exchangeId, int batchId) throws IgniteCheckedException {
- messageService().send(nodeId, new InboxCancelMessage(qryId,
fragmentId, exchangeId, batchId));
+ @Override public void closeOutbox(UUID nodeId, UUID qryId, long
fragmentId, long exchangeId) throws IgniteCheckedException {
+ messageService().send(nodeId, new OutboxCloseMessage(qryId,
fragmentId, exchangeId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId,
long exchangeId) throws IgniteCheckedException {
+ messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId,
exchangeId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendError(UUID nodeId, UUID qryId, long fragmentId,
Throwable err) throws IgniteCheckedException {
+ messageService().send(nodeId, new ErrorMessage(qryId, fragmentId,
err));
}
/** {@inheritDoc} */
@@ -129,24 +143,46 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** {@inheritDoc} */
@Override public void init() {
- messageService().register((n, m) -> onMessage(n, (InboxCancelMessage)
m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
+ messageService().register((n, m) -> onMessage(n, (InboxCloseMessage)
m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
+ messageService().register((n, m) -> onMessage(n, (OutboxCloseMessage)
m), MessageType.QUERY_OUTBOX_CANCEL_MESSAGE);
messageService().register((n, m) -> onMessage(n,
(QueryBatchAcknowledgeMessage) m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchMessage)
m), MessageType.QUERY_BATCH_MESSAGE);
}
+ /** {@inheritDoc} */
+ @Override public boolean alive(UUID nodeId) {
+ return messageService().alive(nodeId);
+ }
+
/** */
- protected void onMessage(UUID nodeId, InboxCancelMessage msg) {
- Inbox<?> inbox = mailboxRegistry().inbox(msg.queryId(),
msg.exchangeId());
+ protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
+ Collection<Inbox<?>> inboxes =
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+ if (!F.isEmpty(inboxes)) {
+ for (Inbox<?> inbox : inboxes)
+ inbox.context().execute(inbox::close);
+ }
+ else if (log.isDebugEnabled()) {
+ log.debug("Stale inbox cancel message received: [" +
+ "nodeId=" + nodeId + ", " +
+ "queryId=" + msg.queryId() + ", " +
+ "fragmentId=" + msg.fragmentId() + ", " +
+ "exchangeId=" + msg.exchangeId() + "]");
+ }
+ }
- if (inbox != null)
- inbox.cancel();
+ /** */
+ protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
+ Collection<Outbox<?>> outboxes =
mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+ if (!F.isEmpty(outboxes)) {
+ for (Outbox<?> outbox : outboxes)
+ outbox.context().execute(outbox::close);
+ }
else if (log.isDebugEnabled()) {
- log.debug("Stale cancel message received: [" +
+ log.debug("Stale oubox cancel message received: [" +
"nodeId=" + nodeId + ", " +
"queryId=" + msg.queryId() + ", " +
"fragmentId=" + msg.fragmentId() + ", " +
- "exchangeId=" + msg.exchangeId() + ", " +
- "batchId=" + msg.batchId() + "]");
+ "exchangeId=" + msg.exchangeId() + "]");
}
}
@@ -173,7 +209,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
if (inbox == null && msg.batchId() == 0) {
// first message sent before a fragment is built
// note that an inbox source fragment id is also used as an
exchange id
- Inbox<?> newInbox = new Inbox<>(baseInboxContext(msg.queryId(),
msg.fragmentId()),
+ Inbox<?> newInbox = new Inbox<>(baseInboxContext(nodeId,
msg.queryId(), msg.fragmentId()),
this, mailboxRegistry(), msg.exchangeId(), msg.exchangeId());
inbox = mailboxRegistry().register(newInbox);
@@ -194,10 +230,11 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/**
* @return Minimal execution context to meet Inbox needs.
*/
- private ExecutionContext<?> baseInboxContext(UUID qryId, long fragmentId) {
+ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long
fragmentId) {
return new ExecutionContext<>(
taskExecutor(),
PlanningContext.builder()
+ .originatingNodeId(nodeId)
.logger(log)
.build(),
qryId,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 70d52b0..4e7cdd5 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -57,9 +57,6 @@ public class ExecutionContext<Row> implements DataContext {
/** */
private final ExpressionFactory<Row> expressionFactory;
- /** */
- private volatile boolean cancelled;
-
/**
* @param ctx Parent context.
* @param qryId Query ID.
@@ -150,13 +147,6 @@ public class ExecutionContext<Row> implements DataContext {
}
/**
- * @return Cancelled flag.
- */
- public boolean cancelled() {
- return cancelled;
- }
-
- /**
* @return Handler to access row fields.
*/
public RowHandler<Row> rowHandler() {
@@ -198,14 +188,6 @@ public class ExecutionContext<Row> implements DataContext {
}
/**
- * Sets cancelled flag.
- */
- public void markCancelled() {
- if (!cancelled)
- cancelled = true;
- }
-
- /**
* Executes a query task.
*
* @param task Query task.
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 4826dbe..aea46c7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -27,7 +27,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -49,7 +48,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.ValidationException;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.events.EventType;
@@ -68,17 +66,19 @@ import
org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryCancellable;
import org.apache.ignite.internal.processors.query.QueryContext;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
-import
org.apache.ignite.internal.processors.query.calcite.message.QueryCancelRequest;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
import
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import
org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
import
org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
@@ -399,13 +399,10 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** {@inheritDoc} */
@Override public void cancelQuery(UUID qryId) {
- mailboxRegistry().outboxes(qryId).forEach(this::executeCancel);
- mailboxRegistry().inboxes(qryId).forEach(this::executeCancel);
-
QueryInfo info = running.get(qryId);
if (info != null)
- info.cancel();
+ info.doCancel();
}
/** {@inheritDoc} */
@@ -435,7 +432,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
@Override public void init() {
messageService().register((n,m) -> onMessage(n, (QueryStartRequest)
m), MessageType.QUERY_START_REQUEST);
messageService().register((n,m) -> onMessage(n, (QueryStartResponse)
m), MessageType.QUERY_START_RESPONSE);
- messageService().register((n,m) -> onMessage(n, (QueryCancelRequest)
m), MessageType.QUERY_CANCEL_REQUEST);
+ messageService().register((n,m) -> onMessage(n, (ErrorMessage) m),
MessageType.QUERY_ERROR_MESSAGE);
eventManager().addDiscoveryEventListener(discoLsnr,
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -722,47 +719,38 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
register(info);
// start remote execution
- if (fragments.size() > 1) {
- for (int i = 1; i < fragments.size(); i++) {
- Fragment fragment0 = fragments.get(i);
- NodesMapping mapping0 = plan.fragmentMapping(fragment0);
-
- boolean error = false;
-
- for (UUID nodeId : mapping0.nodes()) {
- if (error)
- info.onResponse(nodeId, fragment0.fragmentId(), new
QueryCancelledException());
- else {
- try {
- FragmentDescription fragmentDesc0 = new
FragmentDescription(
- fragment0.fragmentId(),
- mapping0.partitions(nodeId),
- mapping0.assignments().size(),
- plan.targetMapping(fragment0),
- plan.remoteSources(fragment0)
- );
-
- QueryStartRequest req = new QueryStartRequest(
- qryId,
- pctx.schemaName(),
- toJson(fragment0.root()),
- pctx.topologyVersion(),
- fragmentDesc0,
- pctx.parameters());
-
- messageService().send(nodeId, req);
- }
- catch (Exception e) {
- info.onResponse(nodeId, fragment0.fragmentId(), e);
- error = true;
- }
+ for (int i = 1; i < fragments.size(); i++) {
+ Fragment fragment0 = fragments.get(i);
+ NodesMapping mapping0 = plan.fragmentMapping(fragment0);
+
+ boolean error = false;
+ for (UUID nodeId : mapping0.nodes()) {
+ if (error)
+ info.onResponse(nodeId, fragment0.fragmentId(), new
QueryCancelledException());
+ else {
+ try {
+ FragmentDescription fragmentDesc0 = new
FragmentDescription(
+ fragment0.fragmentId(),
+ mapping0.partitions(nodeId),
+ mapping0.assignments().size(),
+ plan.targetMapping(fragment0),
+ plan.remoteSources(fragment0)
+ );
+
+ QueryStartRequest req = new QueryStartRequest(
+ qryId,
+ pctx.schemaName(),
+ toJson(fragment0.root()),
+ pctx.topologyVersion(),
+ fragmentDesc0,
+ pctx.parameters());
+
+ messageService().send(nodeId, req);
+ }
+ catch (Exception e) {
+ info.onResponse(nodeId, fragment0.fragmentId(), e);
+ error = true;
}
- }
-
- if (error) {
- info.awaitAllReplies();
-
- throw new AssertionError(); // Previous call must throw an
exception
}
}
}
@@ -844,58 +832,49 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
assert nodeId != null && msg != null;
PlanningContext ctx = createContext(msg.schema(), nodeId,
msg.topologyVersion());
+ ExecutionContext<Row> execCtx = new ExecutionContext<>(taskExecutor(),
ctx, msg.queryId(),
+ msg.fragmentDescription(), handler,
Commons.parametersMap(msg.parameters()));
+ Outbox<Row> node;
try {
- ExecutionContext<Row> execCtx = new ExecutionContext<>(
- taskExecutor(),
- ctx,
- msg.queryId(),
- msg.fragmentDescription(),
- handler,
- Commons.parametersMap(msg.parameters())
- );
-
- Node<Row> node = new LogicalRelImplementor<>(
+ node = new LogicalRelImplementor<>(
execCtx,
partitionService(),
mailboxRegistry(),
exchangeService(),
failureProcessor())
.go(fromJson(ctx, msg.root()));
-
- assert node instanceof Outbox : node;
-
- node.context().execute(((Outbox<Row>) node)::init);
-
- messageService().send(nodeId, new
QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId()));
}
- catch (Throwable ex) { // TODO don't catch errors!
- cancelQuery(msg.queryId());
-
- if (ex instanceof ClusterTopologyCheckedException)
- return;
+ catch (Exception ex) {
+ U.error(log, "Failed to build execution tree. ", ex);
- U.warn(log, "Failed to start query. [nodeId=" + nodeId + ']', ex);
+ mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1)
+ .forEach(Outbox::close);
+ mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1)
+ .forEach(Inbox::close);
try {
- messageService().send(nodeId, new
QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId(), ex));
+ messageService().send(nodeId, new
QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
}
catch (IgniteCheckedException e) {
- e.addSuppressed(ex);
-
U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']',
e);
}
- if (ex instanceof Error)
- throw (Error)ex;
+ return;
}
- }
- /** */
- private void onMessage(UUID nodeId, QueryCancelRequest msg) {
- assert nodeId != null && msg != null;
+ try {
+ messageService().send(nodeId, new
QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId()));
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
+
+ node.onNodeLeft(nodeId);
+
+ return;
+ }
- cancelQuery(msg.queryId());
+ node.init();
}
/** */
@@ -909,37 +888,18 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
}
/** */
- private void onCursorClose(RootNode<?> rootNode) {
- switch (rootNode.state()) {
- case CANCELLED:
- cancelQuery(rootNode.queryId());
+ private void onMessage(UUID nodeId, ErrorMessage msg) {
+ assert nodeId != null && msg != null;
- break;
- case END:
- running.remove(rootNode.queryId());
+ QueryInfo info = running.get(msg.queryId());
- break;
- default:
- throw new AssertionError();
- }
+ if (info != null)
+ info.onError(new RemoteException(nodeId, msg.queryId(),
msg.fragmentId(), msg.error()));
}
/** */
private void onNodeLeft(UUID nodeId) {
running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(nodeId));
-
- final Predicate<Node<?>> p = new OriginatingFilter(nodeId);
-
- mailboxRegistry().outboxes(null).stream()
- .filter(p).forEach(this::executeCancel);
-
- mailboxRegistry().inboxes(null).stream()
- .filter(p).forEach(this::executeCancel);
- }
-
- /** */
- private void executeCancel(Node<?> node) {
- node.context().execute(node::cancel);
}
/** */
@@ -948,10 +908,10 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
RUNNING,
/** */
- CANCELLING,
+ CLOSING,
/** */
- CANCELLED
+ CLOSED
}
/** */
@@ -1009,13 +969,10 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
private QueryState state;
/** */
- private Throwable error;
-
- /** */
private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan,
Node<Row> root) {
this.ctx = ctx;
- RootNode<Row> rootNode = new RootNode<>(ctx,
ExecutionServiceImpl.this::onCursorClose);
+ RootNode<Row> rootNode = new RootNode<>(ctx, this::tryClose);
rootNode.register(root);
this.root = rootNode;
@@ -1043,71 +1000,43 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** {@inheritDoc} */
@Override public void doCancel() {
- cancel();
- }
-
- /** */
- private void awaitAllReplies() {
- Throwable error;
-
- try {
- synchronized (this) {
- while (!waiting.isEmpty())
- wait();
-
- error = this.error;
- }
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedException(e);
- }
-
- if (error != null)
- throw new IgniteSQLException("Failed to execute query.",
error);
+ root.close();
}
- /** */
- private void cancel() {
- boolean cancelLoc = false;
- boolean cancelRemote = false;
+ /**
+ * Can be called multiple times after receive each error at {@link
#onResponse(RemoteFragmentKey, Throwable)}.
+ */
+ private void tryClose() {
QueryState state0 = null;
synchronized (this) {
- if (state == QueryState.CANCELLED)
+ if (state == QueryState.CLOSED)
return;
- if (state == QueryState.RUNNING) {
- cancelLoc = true;
- state0 = state = QueryState.CANCELLING;
- }
+ if (state == QueryState.RUNNING)
+ state0 = state = QueryState.CLOSING;
- if (state == QueryState.CANCELLING && waiting.isEmpty()) {
- cancelRemote = true;
- state0 = state = QueryState.CANCELLED;
- }
+ if (state == QueryState.CLOSING && waiting.isEmpty())
+ state0 = state = QueryState.CLOSED;
}
- if (cancelLoc)
- root.cancel();
+ if (state0 == QueryState.CLOSED) {
+ // 1) unregister runing query
+ running.remove(ctx.queryId());
- if (cancelRemote) {
- QueryCancelRequest msg = new QueryCancelRequest(ctx.queryId());
+ // 2) close local fragment
+ root.proceedClose();
- for (UUID remote : remotes) {
+ // 3) close remote fragments
+ for (UUID nodeId : remotes) {
try {
- messageService().send(remote, msg);
- }
- catch (ClusterTopologyCheckedException e) {
- U.warn(log, e.getMessage(), e);
+ exchangeService().closeOutbox(nodeId, ctx.queryId(),
-1, -1);
}
catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ U.warn(log, "Failed to send cancel message. [nodeId="
+ nodeId + ']', e);
}
}
}
-
- if (state0 == QueryState.CANCELLED)
- running.remove(ctx.queryId());
}
/** */
@@ -1127,7 +1056,8 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
}
if (!F.isEmpty(fragments)) {
- ClusterTopologyCheckedException ex = new
ClusterTopologyCheckedException("Failed to start query, node left. nodeId=" +
nodeId);
+ ClusterTopologyCheckedException ex = new
ClusterTopologyCheckedException(
+ "Failed to start query, node left. nodeId=" + nodeId);
for (RemoteFragmentKey fragment : fragments)
onResponse(fragment, ex);
@@ -1141,46 +1071,23 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** */
private void onResponse(RemoteFragmentKey fragment, Throwable error) {
- boolean cancel;
-
+ QueryState state;
synchronized (this) {
- if (!waiting.remove(fragment))
- return;
-
- if (error != null) {
- if (this.error != null)
- this.error.addSuppressed(error);
- else
- this.error = error;
- }
-
- boolean empty = waiting.isEmpty();
-
- cancel = empty && this.error != null;
-
- if (empty)
- notifyAll();
+ waiting.remove(fragment);
+ state = this.state;
}
- if (cancel)
- cancel();
+ if (error != null)
+ onError(error);
+ else if (state == QueryState.CLOSING)
+ tryClose();
}
- }
-
- /** */
- private static final class OriginatingFilter implements Predicate<Node<?>>
{
- /** */
- private final UUID nodeId;
/** */
- private OriginatingFilter(UUID nodeId) {
- this.nodeId = nodeId;
- }
+ private void onError(Throwable error) {
+ root.onError(error);
- /** {@inheritDoc} */
- @Override public boolean test(Node node) {
- // Uninitialized inbox doesn't know originating node ID.
- return Objects.equals(node.context().originatingNodeId(), nodeId);
+ tryClose();
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index a6d720f..e15a8b6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -43,30 +43,32 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import
org.apache.ignite.internal.processors.query.h2.database.H2TreeFilterClosure;
+import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRow;
import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.h2.value.DataType;
import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
/**
* Scan on index.
*/
-public class IndexScan<Row> implements Iterable<Row> {
+public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
/** */
- private final ExecutionContext<Row> ectx;
+ private final GridKernalContext kctx;
/** */
- private final CacheObjectContext coCtx;
+ private final GridCacheContext<?, ?> cctx;
/** */
- private final GridKernalContext ctx;
+ private final ExecutionContext<Row> ectx;
/** */
- private final GridCacheContext<?, ?> cacheCtx;
+ private final CacheObjectContext coCtx;
/** */
private final TableDescriptor desc;
@@ -96,154 +98,172 @@ public class IndexScan<Row> implements Iterable<Row> {
private final MvccSnapshot mvccSnapshot;
/** */
- private List<GridDhtLocalPartition> partsToReserve;
+ private List<GridDhtLocalPartition> reserved;
/**
- * @param ctx Cache context.
+ * @param ectx Execution context.
* @param igniteIdx Index tree.
* @param filters Additional filters.
* @param lowerBound Lower index scan bound.
* @param upperBound Upper index scan bound.
*/
public IndexScan(
- ExecutionContext<Row> ctx,
+ ExecutionContext<Row> ectx,
IgniteIndex igniteIdx,
Predicate<Row> filters,
Row lowerBound,
Row upperBound
) {
- ectx = ctx;
+ this.ectx = ectx;
desc = igniteIdx.table().descriptor();
+ cctx = desc.cacheContext();
+ kctx = cctx.kernalContext();
+ coCtx = cctx.cacheObjectContext();
+
+ RelDataType rowType = desc.selectRowType(this.ectx.getTypeFactory());
+
+ factory = this.ectx.rowHandler().factory(this.ectx.getTypeFactory(),
rowType);
idx = igniteIdx.index();
+ topVer = ectx.planningContext().topologyVersion();
this.filters = filters;
this.lowerBound = lowerBound;
this.upperBound = upperBound;
- cacheCtx = igniteIdx.table().descriptor().cacheContext();
- coCtx = cacheCtx.cacheObjectContext();
- this.ctx = coCtx.kernalContext();
- topVer = ctx.planningContext().topologyVersion();
- partsArr = ctx.partitions();
- mvccSnapshot = ctx.mvccSnapshot();
-
- RelDataType rowType = desc.selectRowType(ectx.getTypeFactory());
- factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
+ partsArr = ectx.partitions();
+ mvccSnapshot = ectx.mvccSnapshot();
}
/** {@inheritDoc} */
- @Override public Iterator<Row> iterator() {
- H2TreeFilterClosure filterC = filterClosure();
+ @Override public synchronized Iterator<Row> iterator() {
+ reserve();
+ try {
+ H2Row lower = lowerBound == null ? null : new
H2PlainRow(values(coCtx, ectx, lowerBound));
+ H2Row upper = upperBound == null ? null : new
H2PlainRow(values(coCtx, ectx, upperBound));
- H2Row lower = lowerBound == null ? null : new CalciteH2Row<>(coCtx,
ectx, lowerBound);
- H2Row upper = upperBound == null ? null : new CalciteH2Row<>(coCtx,
ectx, upperBound);
-
- reservePartitions();
-
- GridCursor<H2Row> cur = idx.find(lower, upper, filterC);
+ return new IteratorImpl(idx.find(lower, upper, filterClosure()));
+ }
+ catch (Exception e) {
+ release();
- return new CursorIteratorWrapper(cur);
+ throw e;
+ }
}
/** */
- public H2TreeFilterClosure filterClosure() {
- IndexingQueryFilter filter = new IndexingQueryFilterImpl(ctx, topVer,
partsArr);
- IndexingQueryCacheFilter f = filter.forCache(cacheCtx.name());
- H2TreeFilterClosure filterC = null;
-
- if (f != null || mvccSnapshot != null )
- filterC = new H2TreeFilterClosure(f, mvccSnapshot, cacheCtx,
ectx.planningContext().logger());
-
- return filterC;
+ @Override public void close() {
+ release();
}
/** */
- private void reservePartitions() {
- assert partsToReserve == null : partsToReserve;
+ private synchronized void reserve() {
+ if (reserved != null)
+ return;
- partsToReserve = gatherPartitions(cacheCtx, partsArr);
+ List<GridDhtLocalPartition> toReserve;
- for (GridDhtLocalPartition part : partsToReserve) {
- if (part == null || !part.reserve())
- throw reservationException();
- else if (part.state() != GridDhtPartitionState.OWNING) {
- part.release();
+ if (cctx.isReplicated()) {
+ int partsCnt = cctx.affinity().partitions();
+ toReserve = new ArrayList<>(partsCnt);
+ GridDhtPartitionTopology top = cctx.topology();
+ for (int i = 0; i < partsCnt; i++)
+ toReserve.add(top.localPartition(i));
+ }
+ else if (cctx.isPartitioned()) {
+ assert partsArr != null;
- throw reservationException();
- }
+ toReserve = new ArrayList<>(partsArr.length);
+ GridDhtPartitionTopology top = cctx.topology();
+ for (int i = 0; i < partsArr.length; i++)
+ toReserve.add(top.localPartition(partsArr[i]));
}
- }
+ else {
+ assert cctx.isLocal();
- /** */
- private List<GridDhtLocalPartition> gatherPartitions(GridCacheContext<?,
?> ctx, int[] arr ) {
- if (ctx.isReplicated()) {
- int partsCnt = ctx.affinity().partitions();
- GridDhtPartitionTopology top = ctx.topology();
- List<GridDhtLocalPartition> parts = new ArrayList<>(partsCnt);
+ toReserve = Collections.emptyList();
+ }
- for (int i = 0; i < partsCnt; i++)
- parts.add(top.localPartition(i));
+ reserved = new ArrayList<>(toReserve.size());
- return parts;
- }
- else if (ctx.isPartitioned()) {
- assert arr != null;
- List<GridDhtLocalPartition> parts = new ArrayList<>(arr.length);
- GridDhtPartitionTopology top = ctx.topology();
+ try {
+ for (GridDhtLocalPartition part : toReserve) {
+ if (part == null || !part.reserve())
+ throw new IgniteSQLException("Failed to reserve partition
for query execution. Retry on stable topology.");
+ else if (part.state() != GridDhtPartitionState.OWNING) {
+ part.release();
- for (int i = 0; i < arr.length; i++)
- parts.add(top.localPartition(arr[i]));
+ throw new IgniteSQLException("Failed to reserve partition
for query execution. Retry on stable topology.");
+ }
- return parts;
+ reserved.add(part);
+ }
}
- else {
- assert ctx.isLocal();
+ catch (Exception e) {
+ release();
- return Collections.emptyList();
+ throw e;
}
}
/** */
- private void releasePartitions() {
- assert partsToReserve != null;
+ private synchronized void release() {
+ if (reserved == null)
+ return;
- for (GridDhtLocalPartition part : partsToReserve)
+ for (GridDhtLocalPartition part : reserved)
part.release();
+
+ reserved = null;
}
/** */
- private IgniteSQLException reservationException() {
- return new IgniteSQLException("Failed to reserve partition for query
execution. Retry on stable topology.");
+ private H2TreeFilterClosure filterClosure() {
+ IndexingQueryFilter filter = new IndexingQueryFilterImpl(kctx, topVer,
partsArr);
+ IndexingQueryCacheFilter f = filter.forCache(cctx.name());
+ H2TreeFilterClosure filterC = null;
+
+ if (f != null || mvccSnapshot != null )
+ filterC = new H2TreeFilterClosure(f, mvccSnapshot, cctx,
ectx.planningContext().logger());
+
+ return filterC;
}
/** */
- private class CursorIteratorWrapper extends
GridCloseableIteratorAdapter<Row> {
+ private Value[] values(CacheObjectValueContext cctx, ExecutionContext<Row>
ectx, Row row) {
+ try {
+ RowHandler<Row> rowHnd = ectx.rowHandler();
+ int rowLen = rowHnd.columnCount(row);
+
+ Value[] values = new Value[rowLen];
+ for (int i = 0; i < rowLen; i++) {
+ Object o = rowHnd.get(i, row);
+
+ if (o != null)
+ values[i] = H2Utils.wrap(cctx, o,
DataType.getTypeFromClass(o.getClass()));
+ }
+
+ return values;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to wrap object into H2 Value.",
e);
+ }
+ }
+
+ /** */
+ private class IteratorImpl extends GridIteratorAdapter<Row> {
/** */
private final GridCursor<H2Row> cursor;
/** Next element. */
private Row next;
- /**
- * @param cursor Cursor.
- */
- CursorIteratorWrapper(GridCursor<H2Row> cursor) {
- assert cursor != null;
+ /** */
+ public IteratorImpl(@NotNull GridCursor<H2Row> cursor) {
this.cursor = cursor;
}
/** {@inheritDoc} */
- @Override protected Row onNext() {
- if (next == null)
- throw new NoSuchElementException();
-
- Row res = next;
-
- next = null;
-
- return res;
- }
+ @Override public boolean hasNextX() throws IgniteCheckedException {
+ assert cursor != null;
- /** {@inheritDoc} */
- @Override protected boolean onHasNext() throws IgniteCheckedException {
if (next != null)
return true;
@@ -259,58 +279,22 @@ public class IndexScan<Row> implements Iterable<Row> {
}
/** {@inheritDoc} */
- @Override protected void onClose() {
- releasePartitions();
- }
- }
-
- /** */
- private static class CalciteH2Row<Row> extends H2Row {
- /** */
- private final Value[] values;
-
- /** */
- CalciteH2Row(CacheObjectValueContext coCtx, ExecutionContext<Row> ctx,
Row row) {
- try {
- RowHandler<Row> rowHnd = ctx.rowHandler();
-
- int colCnt = rowHnd.columnCount(row);
-
- values = new Value[colCnt];
-
- for (int i = 0; i < colCnt; i++) {
- Object o = rowHnd.get(i, row);
-
- if (o != null) {
- Value v = H2Utils.wrap(coCtx, o,
DataType.getTypeFromClass(o.getClass()));
+ @Override public Row nextX() {
+ assert cursor != null;
- values[i] = v;
- }
- }
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to wrap object into H2
Value.", e);
- }
- }
+ if (next == null)
+ throw new NoSuchElementException();
- /** {@inheritDoc} */
- @Override public boolean indexSearchRow() {
- return true;
- }
+ Row res = next;
- /** {@inheritDoc} */
- @Override public int getColumnCount() {
- return values.length;
- }
+ next = null;
- /** {@inheritDoc} */
- @Override public Value getValue(int idx) {
- return values[idx];
+ return res;
}
/** {@inheritDoc} */
- @Override public void setValue(int idx, Value v) {
- throw new AssertionError("Not supported.");
+ @Override public void removeX() {
+ throw new UnsupportedOperationException("Remove is not
supported.");
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index bb4297a..8b9ae84 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -400,7 +400,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
}
/** */
- public Node<Row> go(IgniteRel rel) {
- return visit(rel);
+ public <T extends Node<Row>> T go(IgniteRel rel) {
+ return (T)visit(rel);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index d49009d..8c7c2d4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -81,16 +81,20 @@ public interface MailboxRegistry extends Service {
/**
* Returns all registered inboxes for provided query ID.
*
- * @param qryId Query ID. {@code null} means return all registered inboxes.
+ * @param qryId Query ID. {@code null} means return inboxes with any query
id.
+ * @param fragmentId Fragment Id. {@code -1} means return inboxes with any
fragment id.
+ * @param exchangeId Exchange Id. {@code -1} means return inboxes with any
exchange id.
* @return Registered inboxes.
*/
- Collection<Inbox<?>> inboxes(@Nullable UUID qryId);
+ Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long fragmentId, long
exchangeId);
/**
* Returns all registered outboxes for provided query ID.
*
- * @param qryId Query ID. {@code null} means return all registered
outboxes.
+ * @param qryId Query ID. {@code null} means return outboxes with any
query id.
+ * @param fragmentId Fragment Id. {@code -1} means return outboxes with
any fragment id.
+ * @param exchangeId Exchange Id. {@code -1} means return outboxes with
any exchange id.
* @return Registered outboxes.
*/
- Collection<Outbox<?>> outboxes(@Nullable UUID qryId);
+ Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long
exchangeId);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index 77e1101..39ace60 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -19,12 +19,18 @@ package
org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Mailbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.jetbrains.annotations.Nullable;
@@ -34,11 +40,20 @@ import org.jetbrains.annotations.Nullable;
*/
public class MailboxRegistryImpl extends AbstractService implements
MailboxRegistry {
/** */
+ private static final Predicate<Mailbox<?>> ALWAYS_TRUE = o -> true;
+
+ /** */
private final Map<MailboxKey, Outbox<?>> locals;
/** */
private final Map<MailboxKey, Inbox<?>> remotes;
+ /** */
+ private final DiscoveryEventListener discoLsnr;
+
+ /** */
+ private GridEventStorageManager evtMgr;
+
/**
* @param ctx Kernal.
*/
@@ -47,6 +62,25 @@ public class MailboxRegistryImpl extends AbstractService
implements MailboxRegis
locals = new ConcurrentHashMap<>();
remotes = new ConcurrentHashMap<>();
+
+ discoLsnr = (e, c) -> onNodeLeft(e.eventNode().id());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onStart(GridKernalContext ctx) {
+ eventManager(ctx.event());
+
+ init();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init() {
+ eventManager().addDiscoveryEventListener(discoLsnr,
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void tearDown() {
+ eventManager().removeDiscoveryEventListener(discoLsnr,
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
}
/** {@inheritDoc} */
@@ -58,7 +92,7 @@ public class MailboxRegistryImpl extends AbstractService
implements MailboxRegis
/** {@inheritDoc} */
@Override public void unregister(Inbox<?> inbox) {
- remotes.remove(new MailboxKey(inbox.queryId(), inbox.exchangeId()));
+ remotes.remove(new MailboxKey(inbox.queryId(), inbox.exchangeId()),
inbox);
}
/** {@inheritDoc} */
@@ -70,7 +104,7 @@ public class MailboxRegistryImpl extends AbstractService
implements MailboxRegis
/** {@inheritDoc} */
@Override public void unregister(Outbox<?> outbox) {
- locals.remove(new MailboxKey(outbox.queryId(), outbox.exchangeId()));
+ locals.remove(new MailboxKey(outbox.queryId(), outbox.exchangeId()),
outbox);
}
/** {@inheritDoc} */
@@ -84,27 +118,52 @@ public class MailboxRegistryImpl extends AbstractService
implements MailboxRegis
}
/** {@inheritDoc} */
- @Override public Collection<Inbox<?>> inboxes(@Nullable UUID qryId) {
- if (qryId == null)
- return remotes.values();
-
- return remotes.entrySet().stream()
- .filter(e -> e.getKey().qryId.equals(qryId))
- .map(Map.Entry::getValue)
+ @Override public Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long
fragmentId, long exchangeId) {
+ return remotes.values().stream()
+ .filter(makeFilter(qryId, fragmentId, exchangeId))
.collect(Collectors.toList());
}
/** {@inheritDoc} */
- @Override public Collection<Outbox<?>> outboxes(@Nullable UUID qryId) {
- if (qryId == null)
- return locals.values();
-
- return locals.entrySet().stream()
- .filter(e -> e.getKey().qryId.equals(qryId))
- .map(Map.Entry::getValue)
+ @Override public Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long
fragmentId, long exchangeId) {
+ return locals.values().stream()
+ .filter(makeFilter(qryId, fragmentId, exchangeId))
.collect(Collectors.toList());
}
+ /**
+ * @param evtMgr Event manager.
+ */
+ public void eventManager(GridEventStorageManager evtMgr) {
+ this.evtMgr = evtMgr;
+ }
+
+ /**
+ * @return Event manager.
+ */
+ public GridEventStorageManager eventManager() {
+ return evtMgr;
+ }
+
+ /** */
+ private void onNodeLeft(UUID nodeId) {
+ locals.values().forEach(n -> n.onNodeLeft(nodeId));
+ remotes.values().forEach(n -> n.onNodeLeft(nodeId));
+ }
+
+ /** */
+ private static Predicate<Mailbox<?>> makeFilter(@Nullable UUID qryId, long
fragmentId, long exchangeId) {
+ Predicate<Mailbox<?>> filter = ALWAYS_TRUE;
+ if (qryId != null)
+ filter = filter.and(mailbox -> Objects.equals(mailbox.queryId(),
qryId));
+ if (fragmentId != -1)
+ filter = filter.and(mailbox -> mailbox.fragmentId() == fragmentId);
+ if (exchangeId != -1)
+ filter = filter.and(mailbox -> mailbox.exchangeId() == exchangeId);
+
+ return filter;
+ }
+
/** */
private static class MailboxKey {
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
deleted file mode 100644
index 090abc7..0000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.exec;
-
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheStoppedException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
-import
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
-import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/** */
-public class TableScan<Row> implements Iterable<Row> {
- /** */
- private final ExecutionContext<Row> ectx;
-
- /** */
- private final TableDescriptor desc;
-
- /** */
- private final RowFactory<Row> factory;
-
- /** */
- public TableScan(ExecutionContext<Row> ectx, TableDescriptor desc) {
- this.ectx = ectx;
- this.desc = desc;
-
- RelDataType rowType = desc.selectRowType(ectx.getTypeFactory());
- factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Row> iterator() {
- try {
- return new IteratorImpl().init();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /**
- * Table scan iterator.
- */
- private class IteratorImpl extends GridCloseableIteratorAdapter<Row> {
- /** */
- private int cacheId;
-
- /** */
- private Queue<GridDhtLocalPartition> parts;
-
- /** */
- private GridDhtLocalPartition part;
-
- /** */
- private GridCursor<? extends CacheDataRow> cur;
-
- /** */
- private Row next;
-
- /** {@inheritDoc} */
- @Override protected Row onNext() {
- if (next == null)
- throw new NoSuchElementException();
-
- Row next = this.next;
-
- this.next = null;
-
- return next;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- assert parts != null;
-
- if (next != null)
- return true;
-
- while (true) {
- if (cur == null) {
- if ((part = parts.poll()) == null)
- break;
-
- cur = part.dataStore().cursor(cacheId,
ectx.mvccSnapshot());
- }
-
- if (cur.next()) {
- CacheDataRow row = cur.get();
-
- if (!desc.match(row))
- continue;
-
- next = desc.toRow(ectx, row, factory);
-
- break;
- } else {
- cur = null;
-
- part.release();
- part = null;
- }
- }
-
- return next != null;
- }
-
- /** {@inheritDoc} */
- @Override protected void onClose() {
- if (part != null)
- part.release();
-
- part = null;
-
- while (!F.isEmpty(parts))
- parts.poll().release();
-
- parts = null;
- }
-
- /** */
- public Iterator<Row> init() throws IgniteCheckedException {
- if (isClosed())
- return Collections.emptyIterator();
-
- GridCacheContext<?, ?> cctx = desc.cacheContext();
-
- if (!cctx.gate().enterIfNotStopped()) {
- close();
-
- throw new CacheStoppedException(cctx.name());
- }
-
- try {
- GridDhtPartitionTopology top = cctx.topology();
-
- top.readLock();
- try {
- GridDhtTopologyFuture fut = top.topologyVersionFuture();
- AffinityTopologyVersion topVer =
ectx.planningContext().topologyVersion();
-
- if (!fut.isDone() ||
fut.topologyVersion().compareTo(topVer) != 0)
- throw new ClusterTopologyCheckedException("Failed to
execute query. Retry on stable topology.");
-
- if (cctx.isPartitioned())
- reservePartitioned(top);
- else
- reserveReplicated(top);
- }
- finally {
- top.readUnlock();
- }
-
- cacheId = cctx.cacheId();
-
- return this;
- }
- catch (Exception e) {
- Commons.closeQuiet(this, e);
-
- throw e;
- }
- finally {
- cctx.gate().leave();
- }
- }
-
- /** */
- private void reserveReplicated(GridDhtPartitionTopology top) {
- List<GridDhtLocalPartition> locParts = top.localPartitions();
-
- parts = new ArrayDeque<>(locParts.size());
-
- for (GridDhtLocalPartition local : locParts) {
- if (!local.reserve())
- throw reservationException();
- else if (local.state() != GridDhtPartitionState.OWNING) {
- local.release();
-
- throw reservationException();
- }
-
- parts.offer(local);
- }
- }
-
- /** */
- private void reservePartitioned(GridDhtPartitionTopology top) {
- AffinityTopologyVersion topVer =
ectx.planningContext().topologyVersion();
- int[] partitions = ectx.partitions();
-
- assert topVer != null && !F.isEmpty(partitions);
-
- parts = new ArrayDeque<>(partitions.length);
-
- for (int p : partitions) {
- GridDhtLocalPartition loc = top.localPartition(p, topVer,
false);
-
- if (loc == null || !loc.reserve())
- throw reservationException();
- else if (loc.state() != GridDhtPartitionState.OWNING) {
- loc.release();
-
- throw reservationException();
- }
-
- parts.offer(loc);
- }
- }
-
- /** */
- private IgniteSQLException reservationException() {
- return new IgniteSQLException("Failed to reserve partition for
query execution. Retry on stable topology.");
- }
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
index 5332485..a64e48b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
@@ -71,6 +71,9 @@ public abstract class AbstractJoinNode<Row> extends
AbstractNode<Row> {
@Override public void request(int rowsCnt) {
checkThread();
+ if (isClosed())
+ return;
+
assert !F.isEmpty(sources) && sources.size() == 2;
assert rowsCnt > 0 && requested == 0;
@@ -124,6 +127,9 @@ public abstract class AbstractJoinNode<Row> extends
AbstractNode<Row> {
private void pushLeft(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waitingLeft > 0;
@@ -138,6 +144,9 @@ public abstract class AbstractJoinNode<Row> extends
AbstractNode<Row> {
private void pushRight(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waitingRight > 0;
@@ -153,6 +162,9 @@ public abstract class AbstractJoinNode<Row> extends
AbstractNode<Row> {
private void endLeft() {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waitingLeft > 0;
@@ -165,6 +177,9 @@ public abstract class AbstractJoinNode<Row> extends
AbstractNode<Row> {
private void endRight() {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waitingRight > 0;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index 67a2718..bd1ad5e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -37,10 +37,10 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
protected static final int MODIFY_BATCH_SIZE =
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
/** */
- protected static final int IO_BATCH_SIZE =
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 200);
+ protected static final int IO_BATCH_SIZE =
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 256);
/** */
- protected static final int IO_BATCH_CNT =
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 50);
+ protected static final int IO_BATCH_CNT =
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 4);
/** for debug purpose */
private volatile Thread thread;
@@ -58,6 +58,9 @@ public abstract class AbstractNode<Row> implements Node<Row> {
/** */
protected List<Node<Row>> sources;
+ /** */
+ protected boolean closed;
+
/**
* @param ctx Execution context.
*/
@@ -84,13 +87,23 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
}
/** {@inheritDoc} */
- @Override public void cancel() {
+ @Override public void close() {
checkThread();
- context().markCancelled();
+ if (closed)
+ return;
+
+ closed = true;
if (!F.isEmpty(sources))
- sources.forEach(Node::cancel);
+ sources.forEach(U::closeQuiet);
+ }
+
+ /**
+ * @return {@code true} if the subtree is canceled.
+ */
+ protected boolean isClosed() {
+ return closed;
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
index 347f533..f496b56 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
@@ -97,6 +97,9 @@ public class AggregateNode<Row> extends AbstractNode<Row>
implements SingleNode<
@Override public void request(int rowsCnt) {
checkThread();
+ if (isClosed())
+ return;
+
assert !F.isEmpty(sources) && sources.size() == 1;
assert rowsCnt > 0 && requested == 0;
@@ -114,6 +117,9 @@ public class AggregateNode<Row> extends AbstractNode<Row>
implements SingleNode<
@Override public void push(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
@@ -135,6 +141,9 @@ public class AggregateNode<Row> extends AbstractNode<Row>
implements SingleNode<
@Override public void end() {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
index e7202fd..19c87d3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
@@ -57,6 +57,9 @@ public class FilterNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
@Override public void request(int rowsCnt) {
checkThread();
+ if (isClosed())
+ return;
+
assert !F.isEmpty(sources) && sources.size() == 1;
assert rowsCnt > 0 && requested == 0;
@@ -70,6 +73,9 @@ public class FilterNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
@Override public void push(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
@@ -90,6 +96,9 @@ public class FilterNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
@Override public void end() {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index ce82e36..5e7e281 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -17,27 +17,30 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.calcite.util.Pair;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* A part of exchange.
*/
-public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>,
AutoCloseable {
+public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>,
SingleNode<Row> {
/** */
private final ExchangeService exchange;
@@ -54,7 +57,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements
SingleNode<Row>, Au
private final Map<UUID, Buffer> perNodeBuffers;
/** */
- private Collection<UUID> nodes;
+ private volatile Collection<UUID> srcNodeIds;
/** */
private Comparator<Row> comp;
@@ -92,17 +95,8 @@ public class Inbox<Row> extends AbstractNode<Row> implements
SingleNode<Row>, Au
perNodeBuffers = new HashMap<>();
}
- /**
- * @return Query ID.
- */
- public UUID queryId() {
- return context().queryId();
- }
-
- /**
- * @return Exchange ID.
- */
- public long exchangeId() {
+ /** {@inheritDoc} */
+ @Override public long exchangeId() {
return exchangeId;
}
@@ -110,48 +104,44 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
* Inits this Inbox.
*
* @param ctx Execution context.
- * @param nodes Source nodes.
+ * @param srcNodeIds Source node IDs.
* @param comp Optional comparator for merge exchange.
*/
- public void init(ExecutionContext<Row> ctx, Collection<UUID> nodes,
Comparator<Row> comp) {
+ public void init(ExecutionContext<Row> ctx, Collection<UUID> srcNodeIds,
@Nullable Comparator<Row> comp) {
// It's important to set proper context here because
// because the one, that is created on a first message
- // recived doesn't have all context variables in place.
+ // received doesn't have all context variables in place.
this.ctx = ctx;
- this.nodes = nodes;
this.comp = comp;
+
+ // memory barier
+ this.srcNodeIds = new HashSet<>(srcNodeIds);
}
/** {@inheritDoc} */
@Override public void request(int rowsCnt) {
checkThread();
- assert nodes != null;
+ if (isClosed())
+ return;
+
+ assert srcNodeIds != null;
assert rowsCnt > 0 && requested == 0;
requested = rowsCnt;
- if (buffers == null) {
- nodes.forEach(this::getOrCreateBuffer);
- buffers = new ArrayList<>(perNodeBuffers.values());
-
- assert buffers.size() == nodes.size();
- }
-
if (!inLoop)
context().execute(this::pushInternal);
}
/** {@inheritDoc} */
- @Override public void cancel() {
- checkThread();
- context().markCancelled();
- close();
- }
-
- /** {@inheritDoc} */
@Override public void close() {
+ if (isClosed())
+ return;
+
registry.unregister(this);
+
+ super.close();
}
/** {@inheritDoc} */
@@ -175,6 +165,9 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
public void onBatchReceived(UUID src, int batchId, boolean last, List<Row>
rows) {
checkThread();
+ if (isClosed())
+ return;
+
Buffer buf = getOrCreateBuffer(src);
boolean waitingBefore = buf.check() == State.WAITING;
@@ -185,20 +178,47 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
pushInternal();
}
+ /**
+ * @param e Error.
+ */
+ private void onError(Throwable e) {
+ checkThread();
+
+ assert downstream != null;
+
+ downstream.onError(e);
+
+ close();
+ }
+
/** */
private void pushInternal() {
+ if (isClosed())
+ return;
+
assert downstream != null;
inLoop = true;
+
try {
+ if (buffers == null) {
+ for (UUID node : srcNodeIds)
+ checkNode(node);
+
+ buffers = srcNodeIds.stream()
+ .map(this::getOrCreateBuffer)
+ .collect(Collectors.toList());
+
+ assert buffers.size() == perNodeBuffers.size();
+ }
+
if (comp != null)
pushOrdered();
else
pushUnordered();
}
- catch (Exception e) {
- downstream.onError(e);
- close();
+ catch (Throwable e) {
+ onError(e);
}
finally {
inLoop = false;
@@ -231,7 +251,7 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
}
while (requested > 0 && !heap.isEmpty()) {
- if (context().cancelled())
+ if (isClosed())
return;
Buffer buf = heap.poll().right;
@@ -259,8 +279,6 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
downstream.end();
requested = 0;
-
- close();
}
}
@@ -269,7 +287,7 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
int idx = 0, noProgress = 0;
while (requested > 0 && !buffers.isEmpty()) {
- if (context().cancelled())
+ if (isClosed())
return;
Buffer buf = buffers.get(idx);
@@ -299,8 +317,6 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
if (requested > 0 && buffers.isEmpty()) {
downstream.end();
requested = 0;
-
- close();
}
}
@@ -320,6 +336,28 @@ public class Inbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, Au
}
/** */
+ public void onNodeLeft(UUID nodeId) {
+ if (ctx.originatingNodeId().equals(nodeId) && srcNodeIds == null)
+ ctx.execute(this::close);
+ else if (srcNodeIds != null && srcNodeIds.contains(nodeId))
+ ctx.execute(() -> onNodeLeft0(nodeId));
+ }
+
+ /** */
+ private void onNodeLeft0(UUID nodeId) {
+ checkThread();
+
+ if (getOrCreateBuffer(nodeId).check() != State.END)
+ onError(new ClusterTopologyCheckedException("Node left [nodeId=" +
nodeId + ']'));
+ }
+
+ /** */
+ private void checkNode(UUID nodeId) throws ClusterTopologyCheckedException
{
+ if (!exchange.alive(nodeId))
+ throw new ClusterTopologyCheckedException("Node left [nodeId=" +
nodeId + ']');
+ }
+
+ /** */
private static final class Batch<Row> implements Comparable<Batch<Row>> {
/** */
private final int batchId;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java
similarity index 58%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java
index 3444347..0dc7077 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java
@@ -15,30 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.message;
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
+/** */
+public interface Mailbox<T> extends Node<T> {
+ /**
+ * @return Query ID.
+ */
+ default UUID queryId() {
+ return context().queryId();
+ }
-/**
- *
- */
-public interface MessageService extends Service {
/**
- * Sends a message to given node.
- *
- * @param nodeId Node ID.
- * @param msg Message.
+ * @return Fragment ID.
*/
- void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
+ default long fragmentId() {
+ return context().fragmentId();
+ }
/**
- * Registers a listener for messages of a given type.
- *
- * @param lsnr Listener.
- * @param type Message type.
+ * @return Exchange ID.
*/
- void register(MessageListener lsnr, MessageType type);
+ long exchangeId();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index d291413..773a173 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -91,6 +91,9 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
@Override public void request(int rowsCnt) {
checkThread();
+ if (isClosed())
+ return;
+
assert !F.isEmpty(sources) && sources.size() == 1;
assert rowsCnt > 0 && requested == 0;
@@ -104,6 +107,9 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
@Override public void push(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
assert state == State.UPDATING;
@@ -134,6 +140,9 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
@Override public void end() {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index 33f14ba..6c99f4d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -27,7 +27,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
* <p/><b>Note</b>: except several cases (like consumer node and mailboxes),
{@link Node#request(int)}, {@link Node#cancel()},
* {@link Downstream#push(Object)} and {@link Downstream#end()} methods should
be used from one single thread.
*/
-public interface Node<Row> {
+public interface Node<Row> extends AutoCloseable {
/**
* Returns runtime context allowing access to the tables in a database.
*
@@ -53,9 +53,4 @@ public interface Node<Row> {
* Requests next bunch of rows.
*/
void request(int rowsCnt);
-
- /**
- * Cancels execution.
- */
- void cancel();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index b81223f..199f528 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
* A part of exchange.
*/
-public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>,
Downstream<Row>, AutoCloseable {
+public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>,
SingleNode<Row>, Downstream<Row> {
/** */
private final ExchangeService exchange;
@@ -61,9 +61,6 @@ public class Outbox<Row> extends AbstractNode<Row> implements
SingleNode<Row>, D
private final Map<UUID, Buffer> nodeBuffers = new HashMap<>();
/** */
- private boolean cancelled;
-
- /** */
private int waiting;
/**
@@ -78,8 +75,8 @@ public class Outbox<Row> extends AbstractNode<Row> implements
SingleNode<Row>, D
ExecutionContext<Row> ctx,
ExchangeService exchange,
MailboxRegistry registry,
- long exchangeId, long
- targetFragmentId,
+ long exchangeId,
+ long targetFragmentId,
Destination dest
) {
super(ctx);
@@ -90,17 +87,8 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
this.dest = dest;
}
- /**
- * @return Query ID.
- */
- public UUID queryId() {
- return context().queryId();
- }
-
- /**
- * @return Exchange ID.
- */
- public long exchangeId() {
+ /** {@inheritDoc} */
+ @Override public long exchangeId() {
return exchangeId;
}
@@ -111,7 +99,11 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
* @param batchId Batch ID.
*/
public void onAcknowledge(UUID nodeId, int batchId) {
- nodeBuffers.get(nodeId).onAcknowledge(batchId);
+ Buffer buffer = nodeBuffers.get(nodeId);
+
+ assert buffer != null;
+
+ buffer.onAcknowledge(batchId);
}
/** */
@@ -125,6 +117,9 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
@Override public void push(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert waiting > 0;
waiting--;
@@ -138,6 +133,9 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
@Override public void end() {
checkThread();
+ if (isClosed())
+ return;
+
assert waiting > 0;
waiting = -1;
@@ -145,8 +143,6 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
try {
for (UUID node : dest.targets())
getOrCreateBuffer(node).end();
-
- close();
}
catch (Exception e) {
onError(e);
@@ -158,30 +154,29 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
U.error(context().planningContext().logger(),
"Error occurred during execution: " + X.getFullStackTrace(e));
- cancel(); // TODO send cause to originator.
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- checkThread();
-
- context().markCancelled();
-
- if (cancelled)
- return;
-
- cancelled = true;
-
- nodeBuffers.values().forEach(Buffer::cancel);
+ try {
+ sendError(e);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(context().planningContext().logger(),
+ "Error occurred during send error message: " +
X.getFullStackTrace(e));
+ }
close();
-
- super.cancel();
}
/** {@inheritDoc} */
@Override public void close() {
+ if (isClosed())
+ return;
+
registry.unregister(this);
+
+ // Send cancel message for the Inbox to close Inboxes created by batch
message race.
+ for (UUID node : dest.targets())
+ getOrCreateBuffer(node).close();
+
+ super.close();
}
/** {@inheritDoc} */
@@ -208,9 +203,14 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
}
/** */
- private void sendCancel(UUID nodeId, int batchId) {
+ private void sendError(Throwable err) throws IgniteCheckedException {
+ exchange.sendError(ctx.originatingNodeId(), queryId(), fragmentId(),
err);
+ }
+
+ /** */
+ private void sendInboxClose(UUID nodeId) {
try {
- exchange.cancel(nodeId, queryId(), targetFragmentId, exchangeId,
batchId);
+ exchange.closeInbox(nodeId, queryId(), targetFragmentId,
exchangeId);
}
catch (IgniteCheckedException e) {
U.warn(context().planningContext().logger(), "Failed to send
cancel message.", e);
@@ -264,6 +264,12 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
}
/** */
+ public void onNodeLeft(UUID nodeId) {
+ if (nodeId.equals(ctx.originatingNodeId()))
+ ctx.execute(this::close);
+ }
+
+ /** */
private final class Buffer {
/** */
private final UUID nodeId;
@@ -318,15 +324,18 @@ public class Outbox<Row> extends AbstractNode<Row>
implements SingleNode<Row>, D
}
/** */
- public void cancel() {
+ public void close() {
+ int currBatchId = hwm;
+
if (hwm == Integer.MAX_VALUE)
return;
- int batchId = hwm + 1;
hwm = Integer.MAX_VALUE;
curr = null;
- sendCancel(nodeId, batchId);
+
+ if (currBatchId >= 0)
+ sendInboxClose(nodeId);
}
/**
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
index 0c2c2ab..afe7fc0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
@@ -43,6 +43,9 @@ public class ProjectNode<Row> extends AbstractNode<Row>
implements SingleNode<Ro
@Override public void request(int rowsCnt) {
checkThread();
+ if (isClosed())
+ return;
+
assert !F.isEmpty(sources) && sources.size() == 1;
assert rowsCnt > 0;
@@ -53,6 +56,9 @@ public class ProjectNode<Row> extends AbstractNode<Row>
implements SingleNode<Ro
@Override public void push(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
try {
@@ -67,6 +73,9 @@ public class ProjectNode<Row> extends AbstractNode<Row>
implements SingleNode<Ro
@Override public void end() {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
downstream.end();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index ca4c370..68da4fd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -22,9 +22,9 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -35,8 +35,7 @@ import org.apache.ignite.internal.util.typedef.F;
/**
* Client iterator.
*/
-public class RootNode<Row> extends AbstractNode<Row>
- implements SingleNode<Row>, Downstream<Row>, Iterator<Row>, AutoCloseable {
+public class RootNode<Row> extends AbstractNode<Row> implements
SingleNode<Row>, Downstream<Row>, Iterator<Row> {
/** */
private final ReentrantLock lock;
@@ -47,13 +46,13 @@ public class RootNode<Row> extends AbstractNode<Row>
private final Deque<Row> buff;
/** */
- private final Consumer<RootNode<Row>> onClose;
+ private final Runnable onClose;
/** */
private volatile State state = State.RUNNING;
/** */
- private volatile IgniteSQLException ex;
+ private final AtomicReference<Throwable> ex = new AtomicReference<>();
/** */
private Row row;
@@ -64,15 +63,27 @@ public class RootNode<Row> extends AbstractNode<Row>
/**
* @param ctx Execution context.
*/
- public RootNode(ExecutionContext<Row> ctx, Consumer<RootNode<Row>>
onClose) {
+ public RootNode(ExecutionContext<Row> ctx) {
super(ctx);
- this.onClose = onClose;
+ buff = new ArrayDeque<>(IN_BUFFER_SIZE);
+ lock = new ReentrantLock();
+ cond = lock.newCondition();
- // extra space for possible END marker
- buff = new ArrayDeque<>(IN_BUFFER_SIZE + 1);
+ onClose = this::proceedClose;
+ }
+
+ /**
+ * @param ctx Execution context.
+ */
+ public RootNode(ExecutionContext<Row> ctx, Runnable onClose) {
+ super(ctx);
+
+ buff = new ArrayDeque<>(IN_BUFFER_SIZE);
lock = new ReentrantLock();
cond = lock.newCondition();
+
+ this.onClose = onClose;
}
/** */
@@ -80,39 +91,38 @@ public class RootNode<Row> extends AbstractNode<Row>
return context().queryId();
}
- /** {@inheritDoc} */
- @Override public void cancel() {
- if (state != State.RUNNING)
- return;
+ /** */
+ public void proceedClose() {
+ context().execute(() -> {
+ checkThread();
+ if (isClosed())
+ return;
+
+ buff.clear();
+
+ super.close();
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
lock.lock();
try {
- if (state != State.RUNNING)
+ if (state == State.RUNNING)
+ state = State.CANCELLED;
+ else if (state == State.END)
+ state = State.CLOSED;
+ else
return;
- context().markCancelled();
- state = State.CANCELLED;
- buff.clear();
cond.signalAll();
}
finally {
lock.unlock();
}
-
- context().execute(F.first(sources)::cancel);
- onClose.accept(this);
- }
- /**
- * @return Execution state.
- */
- public State state() {
- return state;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- cancel();
+ onClose.run();
}
/** {@inheritDoc} */
@@ -123,7 +133,7 @@ public class RootNode<Row> extends AbstractNode<Row>
lock.lock();
try {
- assert waiting > 0;
+ assert waiting > 0 : "waiting=" + waiting;
waiting--;
@@ -149,7 +159,7 @@ public class RootNode<Row> extends AbstractNode<Row>
@Override public void end() {
lock.lock();
try {
- assert waiting > 0;
+ assert waiting > 0 : "waiting=" + waiting;
waiting = -1;
@@ -165,18 +175,17 @@ public class RootNode<Row> extends AbstractNode<Row>
/** {@inheritDoc} */
@Override public void onError(Throwable e) {
- checkThread();
+ if (!ex.compareAndSet(null, e))
+ ex.get().addSuppressed(e);
- ex = new IgniteSQLException("An error occurred while query
executing.", IgniteQueryErrorCode.UNKNOWN, e);
-
- cancel();
+ close();
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
if (row != null)
return true;
- else if (state == State.END)
+ else if (state == State.END || state == State.CLOSED)
return false;
else
return (row = take()) != null;
@@ -217,10 +226,10 @@ public class RootNode<Row> extends AbstractNode<Row>
lock.lock();
try {
- checkCancelled();
- assert state == State.RUNNING;
-
while (true) {
+ checkCancelled();
+ assert state == State.RUNNING;
+
if (!buff.isEmpty())
return buff.poll();
else if (waiting == -1)
@@ -231,9 +240,6 @@ public class RootNode<Row> extends AbstractNode<Row>
}
cond.await();
-
- checkCancelled();
- assert state == State.RUNNING;
}
state = State.END;
@@ -247,7 +253,7 @@ public class RootNode<Row> extends AbstractNode<Row>
assert state == State.END;
- onClose.accept(this);
+ close();
return null;
}
@@ -255,22 +261,21 @@ public class RootNode<Row> extends AbstractNode<Row>
/** */
private void checkCancelled() {
if (state == State.CANCELLED) {
- if (ex != null)
- throw ex;
+ ex.compareAndSet(null, new IgniteSQLException("The query was
cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED));
- throw new IgniteSQLException("The query was cancelled while
executing.", IgniteQueryErrorCode.QUERY_CANCELED);
+ throw sqlException(ex.get());
}
}
/** */
- public enum State {
- /** */
- RUNNING,
-
- /** */
- CANCELLED,
+ private IgniteSQLException sqlException(Throwable e) {
+ return e instanceof IgniteSQLException
+ ? (IgniteSQLException)e
+ : new IgniteSQLException("An error occurred while query
executing.", IgniteQueryErrorCode.UNKNOWN, e);
+ }
- /** */
- END
+ /** */
+ private enum State {
+ RUNNING, CANCELLED, END, CLOSED
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 72df1e4..fae9530 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -27,7 +27,7 @@ import
org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
* Scan node.
*/
-public class ScanNode<Row> extends AbstractNode<Row> implements
SingleNode<Row>, AutoCloseable {
+public class ScanNode<Row> extends AbstractNode<Row> implements
SingleNode<Row> {
/** */
private final Iterable<Row> src;
@@ -54,6 +54,9 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
@Override public void request(int rowsCnt) {
checkThread();
+ if (isClosed())
+ return;
+
assert rowsCnt > 0 && requested == 0;
requested = rowsCnt;
@@ -63,15 +66,17 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
}
/** {@inheritDoc} */
- @Override public void cancel() {
- checkThread();
- context().markCancelled();
- close();
- }
-
- /** {@inheritDoc} */
@Override public void close() {
+ if (isClosed())
+ return;
+
Commons.closeQuiet(it);
+
+ it = null;
+
+ Commons.closeQuiet(src);
+
+ super.close();
}
/** {@inheritDoc} */
@@ -86,6 +91,9 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
/** */
private void pushInternal() {
+ if (isClosed())
+ return;
+
inLoop = true;
try {
if (it == null)
@@ -96,7 +104,7 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
Thread thread = Thread.currentThread();
while (requested > 0 && it.hasNext()) {
- if (context().cancelled())
+ if (isClosed())
return;
if (thread.isInterrupted())
@@ -117,16 +125,27 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
downstream.end();
requested = 0;
- close();
+ Commons.closeQuiet(it);
+
+ it = null;
}
}
catch (Throwable e) {
- close();
-
- downstream.onError(e);
+ onError(e);
}
finally {
inLoop = false;
}
}
+
+ /** */
+ private void onError(Throwable e) {
+ checkThread();
+
+ assert downstream != null;
+
+ downstream.onError(e);
+
+ close();
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 79943b3..64c236d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -60,23 +60,28 @@ public class SortNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
@Override public void request(int rowsCnt) {
checkThread();
+ if (isClosed())
+ return;
+
assert !F.isEmpty(sources) && sources.size() == 1;
assert rowsCnt > 0 && requested == 0;
+ assert waiting <= 0;
requested = rowsCnt;
- if (waiting == -1 && !inLoop)
- context().execute(this::flushFromBuffer);
- else if (waiting == 0)
+ if (waiting == 0)
F.first(sources).request(waiting = IN_BUFFER_SIZE);
- else
- throw new AssertionError();
+ else if (!inLoop)
+ context().execute(this::flushFromBuffer);
}
/** {@inheritDoc} */
@Override public void push(Row row) {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
@@ -97,6 +102,9 @@ public class SortNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
@Override public void end() {
checkThread();
+ if (isClosed())
+ return;
+
assert downstream != null;
assert waiting > 0;
@@ -123,28 +131,16 @@ public class SortNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
private void flushFromBuffer() {
assert waiting == -1;
- inLoop = true;
+ int processed = 0;
+ inLoop = true;
try {
- int processed = 0;
+ while (requested > 0 && !rows.isEmpty()) {
+ requested--;
- while (requested > 0) {
- int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
+ downstream.push(rows.poll());
- for (int i = 0; i < toSnd; i++) {
- requested--;
-
- if (rows.isEmpty())
- break;
-
- Row row = rows.poll();
-
- downstream.push(row);
-
- processed++;
- }
-
- if (processed >= IN_BUFFER_SIZE && requested > 0) {
+ if (++processed >= IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
context().execute(this::flushFromBuffer);
@@ -156,7 +152,6 @@ public class SortNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
downstream.end();
requested = 0;
}
-
}
finally {
inLoop = false;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
similarity index 70%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
index 933d656..de45c18 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
@@ -20,13 +20,16 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import java.nio.ByteBuffer;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
-public class InboxCancelMessage implements ExecutionContextAware {
+public class ErrorMessage implements MarshalableMessage {
/** */
private UUID queryId;
@@ -34,44 +37,47 @@ public class InboxCancelMessage implements
ExecutionContextAware {
private long fragmentId;
/** */
- private long exchangeId;
+ private byte[] errBytes;
/** */
- private int batchId;
+ @GridDirectTransient
+ private Throwable err;
/** */
- public InboxCancelMessage(){}
+ public ErrorMessage() {
+ // No-op.
+ }
/** */
- public InboxCancelMessage(UUID queryId, long fragmentId, long exchangeId,
int batchId) {
+ public ErrorMessage(UUID queryId, long fragmentId, Throwable err) {
+ assert err != null;
+
this.queryId = queryId;
this.fragmentId = fragmentId;
- this.exchangeId = exchangeId;
- this.batchId = batchId;
+ this.err = err;
}
- /** {@inheritDoc} */
- @Override public UUID queryId() {
+ /**
+ * @return Query ID.
+ */
+ public UUID queryId() {
return queryId;
}
- /** {@inheritDoc} */
- @Override public long fragmentId() {
- return fragmentId;
- }
-
/**
- * @return Exchange ID.
+ * @return Fragment ID.
*/
- public long exchangeId() {
- return exchangeId;
+ public long fragmentId() {
+ return fragmentId;
}
/**
- * @return Batch ID.
+ * @return Marshaled Throwable.
*/
- public int batchId() {
- return batchId;
+ public Throwable error() {
+ assert err != null;
+
+ return err;
}
/** {@inheritDoc} */
@@ -87,24 +93,18 @@ public class InboxCancelMessage implements
ExecutionContextAware {
switch (writer.state()) {
case 0:
- if (!writer.writeInt("batchId", batchId))
+ if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
case 1:
- if (!writer.writeLong("exchangeId", exchangeId))
- return false;
-
- writer.incrementState();
-
- case 2:
if (!writer.writeLong("fragmentId", fragmentId))
return false;
writer.incrementState();
- case 3:
+ case 2:
if (!writer.writeUuid("queryId", queryId))
return false;
@@ -123,8 +123,8 @@ public class InboxCancelMessage implements
ExecutionContextAware {
return false;
switch (reader.state()) {
- case 0:
- batchId = reader.readInt("batchId");
+ case 0:
+ errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
return false;
@@ -132,14 +132,6 @@ public class InboxCancelMessage implements
ExecutionContextAware {
reader.incrementState();
case 1:
- exchangeId = reader.readLong("exchangeId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
fragmentId = reader.readLong("fragmentId");
if (!reader.isLastRead())
@@ -147,7 +139,7 @@ public class InboxCancelMessage implements
ExecutionContextAware {
reader.incrementState();
- case 3:
+ case 2:
queryId = reader.readUuid("queryId");
if (!reader.isLastRead())
@@ -157,16 +149,26 @@ public class InboxCancelMessage implements
ExecutionContextAware {
}
- return reader.afterMessageRead(InboxCancelMessage.class);
+ return reader.afterMessageRead(ErrorMessage.class);
}
/** {@inheritDoc} */
@Override public MessageType type() {
- return MessageType.QUERY_INBOX_CANCEL_MESSAGE;
+ return MessageType.QUERY_ERROR_MESSAGE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marshaller) throws
IgniteCheckedException {
+ errBytes = marshaller.marshal(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader
loader) throws IgniteCheckedException {
+ err = marshaller.unmarshal(errBytes, loader);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
similarity index 79%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
index 933d656..4398306 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
@@ -26,7 +26,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
-public class InboxCancelMessage implements ExecutionContextAware {
+public class InboxCloseMessage implements CalciteMessage {
/** */
private UUID queryId;
@@ -37,26 +37,28 @@ public class InboxCancelMessage implements
ExecutionContextAware {
private long exchangeId;
/** */
- private int batchId;
-
- /** */
- public InboxCancelMessage(){}
+ public InboxCloseMessage() {
+ // No-op.
+ }
/** */
- public InboxCancelMessage(UUID queryId, long fragmentId, long exchangeId,
int batchId) {
+ public InboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) {
this.queryId = queryId;
this.fragmentId = fragmentId;
this.exchangeId = exchangeId;
- this.batchId = batchId;
}
- /** {@inheritDoc} */
- @Override public UUID queryId() {
+ /**
+ * @return Query ID.
+ */
+ public UUID queryId() {
return queryId;
}
- /** {@inheritDoc} */
- @Override public long fragmentId() {
+ /**
+ * @return Fragment ID.
+ */
+ public long fragmentId() {
return fragmentId;
}
@@ -67,13 +69,6 @@ public class InboxCancelMessage implements
ExecutionContextAware {
return exchangeId;
}
- /**
- * @return Batch ID.
- */
- public int batchId() {
- return batchId;
- }
-
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -87,24 +82,18 @@ public class InboxCancelMessage implements
ExecutionContextAware {
switch (writer.state()) {
case 0:
- if (!writer.writeInt("batchId", batchId))
- return false;
-
- writer.incrementState();
-
- case 1:
if (!writer.writeLong("exchangeId", exchangeId))
return false;
writer.incrementState();
- case 2:
+ case 1:
if (!writer.writeLong("fragmentId", fragmentId))
return false;
writer.incrementState();
- case 3:
+ case 2:
if (!writer.writeUuid("queryId", queryId))
return false;
@@ -124,14 +113,6 @@ public class InboxCancelMessage implements
ExecutionContextAware {
switch (reader.state()) {
case 0:
- batchId = reader.readInt("batchId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
exchangeId = reader.readLong("exchangeId");
if (!reader.isLastRead())
@@ -139,7 +120,7 @@ public class InboxCancelMessage implements
ExecutionContextAware {
reader.incrementState();
- case 2:
+ case 1:
fragmentId = reader.readLong("fragmentId");
if (!reader.isLastRead())
@@ -147,7 +128,7 @@ public class InboxCancelMessage implements
ExecutionContextAware {
reader.incrementState();
- case 3:
+ case 2:
queryId = reader.readUuid("queryId");
if (!reader.isLastRead())
@@ -157,7 +138,7 @@ public class InboxCancelMessage implements
ExecutionContextAware {
}
- return reader.afterMessageRead(InboxCancelMessage.class);
+ return reader.afterMessageRead(InboxCloseMessage.class);
}
/** {@inheritDoc} */
@@ -167,6 +148,6 @@ public class InboxCancelMessage implements
ExecutionContextAware {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 3;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index 3444347..1376c00 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -21,6 +21,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.marshaller.Marshaller;
/**
*
@@ -35,10 +36,28 @@ public interface MessageService extends Service {
void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
/**
+ * Checks whether a node with given ID is alive.
+ *
+ * @param nodeId Node ID.
+ * @return {@code True} if node is alive.
+ */
+ boolean alive(UUID nodeId);
+
+ /**
* Registers a listener for messages of a given type.
*
* @param lsnr Listener.
* @param type Message type.
*/
void register(MessageListener lsnr, MessageType type);
+
+ /**
+ * @return Message marshaller.
+ */
+ Marshaller marshaller();
+
+ /**
+ * @return Message class loader.
+ */
+ ClassLoader classLoader();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index e8c3caa..1061c6e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -105,10 +106,8 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
this.classLoader = classLoader;
}
- /**
- * @return Class loader.
- */
- public ClassLoader classLoader() {
+ /** {@inheritDoc} */
+ @Override public ClassLoader classLoader() {
return classLoader;
}
@@ -133,10 +132,8 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
this.marsh = marsh;
}
- /**
- * @return Marshaller.
- */
- public Marshaller marshaller() {
+ /** {@inheritDoc} */
+ @Override public Marshaller marshaller() {
return marsh;
}
@@ -211,6 +208,16 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
assert old == null : old;
}
+ /** {@inheritDoc} */
+ @Override public boolean alive(UUID nodeId) {
+ try {
+ return !ioManager().checkNodeLeft(nodeId, null, false);
+ }
+ catch (IgniteClientDisconnectedCheckedException e) {
+ throw new AssertionError(e);
+ }
+ }
+
/** */
protected void prepareMarshal(Message msg) throws IgniteCheckedException {
try {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index 9c1446c..6979b06 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -28,11 +28,13 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescr
public enum MessageType {
QUERY_START_REQUEST(300, QueryStartRequest::new),
QUERY_START_RESPONSE(301, QueryStartResponse::new),
- QUERY_CANCEL_REQUEST(302, QueryCancelRequest::new),
+ QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new),
QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new),
- QUERY_INBOX_CANCEL_MESSAGE(305, InboxCancelMessage::new),
- GENERIC_ROW_MESSAGE(306, GenericRowMessage::new),
+ QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new),
+ QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new),
+ GENERIC_ROW_MESSAGE(307, GenericRowMessage::new),
+
NODES_MAPPING(350, NodesMapping::new),
FRAGMENT_DESCRIPTION(351, FragmentDescription::new);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
similarity index 62%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
index 0a3766e..ba6021c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
@@ -26,16 +26,26 @@ import
org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
-public class QueryCancelRequest implements CalciteMessage {
+public class OutboxCloseMessage implements CalciteMessage {
/** */
private UUID queryId;
/** */
- QueryCancelRequest(){}
+ private long fragmentId;
/** */
- public QueryCancelRequest(UUID queryId) {
+ private long exchangeId;
+
+ /** */
+ public OutboxCloseMessage() {
+ // No-op.
+ }
+
+ /** */
+ public OutboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) {
this.queryId = queryId;
+ this.fragmentId = fragmentId;
+ this.exchangeId = exchangeId;
}
/**
@@ -45,6 +55,20 @@ public class QueryCancelRequest implements CalciteMessage {
return queryId;
}
+ /**
+ * @return Fragment ID.
+ */
+ public long fragmentId() {
+ return fragmentId;
+ }
+
+ /**
+ * @return Exchange ID.
+ */
+ public long exchangeId() {
+ return exchangeId;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -58,6 +82,18 @@ public class QueryCancelRequest implements CalciteMessage {
switch (writer.state()) {
case 0:
+ if (!writer.writeLong("exchangeId", exchangeId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("fragmentId", fragmentId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
if (!writer.writeUuid("queryId", queryId))
return false;
@@ -77,6 +113,22 @@ public class QueryCancelRequest implements CalciteMessage {
switch (reader.state()) {
case 0:
+ exchangeId = reader.readLong("exchangeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ fragmentId = reader.readLong("fragmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
queryId = reader.readUuid("queryId");
if (!reader.isLastRead())
@@ -86,16 +138,16 @@ public class QueryCancelRequest implements CalciteMessage {
}
- return reader.afterMessageRead(QueryCancelRequest.class);
+ return reader.afterMessageRead(OutboxCloseMessage.class);
}
/** {@inheritDoc} */
@Override public MessageType type() {
- return MessageType.QUERY_CANCEL_REQUEST;
+ return MessageType.QUERY_OUTBOX_CANCEL_MESSAGE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 1;
+ return 3;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index 2d3c6b2..c5aacc8 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -31,7 +31,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
-public class QueryStartRequest implements MarshalableMessage {
+public class QueryStartRequest implements MarshalableMessage,
ExecutionContextAware {
/** */
private String schema;
@@ -76,13 +76,16 @@ public class QueryStartRequest implements
MarshalableMessage {
return schema;
}
- /**
- * @return Query ID.
- */
- public UUID queryId() {
+ /** {@inheritDoc} */
+ @Override public UUID queryId() {
return qryId;
}
+ /** {@inheritDoc} */
+ @Override public long fragmentId() {
+ return fragmentDesc.fragmentId();
+ }
+
/**
* @return Fragment description.
*/
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java
similarity index 50%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java
index 3444347..9327034 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java
@@ -15,30 +15,54 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.message;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
-
/**
*
*/
-public interface MessageService extends Service {
+public class RemoteException extends RuntimeException {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private final UUID queryId;
+
+ /** */
+ private final long fragmentId;
+
/**
- * Sends a message to given node.
- *
+ * @param cause Cause.
* @param nodeId Node ID.
- * @param msg Message.
+ * @param queryId Query ID.
+ * @param fragmentId Fragment ID.
+ */
+ public RemoteException(UUID nodeId, UUID queryId, long fragmentId,
Throwable cause) {
+ super("Remote query execution", cause);
+ this.nodeId = nodeId;
+ this.queryId = queryId;
+ this.fragmentId = fragmentId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Query ID.
*/
- void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
+ public UUID queryId() {
+ return queryId;
+ }
/**
- * Registers a listener for messages of a given type.
- *
- * @param lsnr Listener.
- * @param type Message type.
+ * @return Fragment ID.
*/
- void register(MessageListener lsnr, MessageType type);
+ public long fragmentId() {
+ return fragmentId;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 28ce580..551a816 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -207,6 +207,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
this.root = root;
}
+ /** */
Fragment build() {
return new Fragment(id, root, remotes.build());
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index a53b357..214dcc9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -216,20 +216,6 @@ public final class Commons {
U.closeQuiet((AutoCloseable) o);
}
- /**
- * @param o Object to close.
- * @param e Exception, what causes close.
- */
- public static void closeQuiet(Object o, @Nullable Exception e) {
- if (!(o instanceof AutoCloseable))
- return;
-
- if (e != null)
- U.closeWithSuppressingException((AutoCloseable) o, e);
- else
- U.closeQuiet((AutoCloseable) o);
- }
-
/** */
public static RelDataType combinedRowType(IgniteTypeFactory typeFactory,
RelDataType... types) {
RelDataTypeFactory.Builder builder = new
RelDataTypeFactory.Builder(typeFactory);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
new file mode 100644
index 0000000..fd83b3b
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Cancel query test.
+ */
+public class CancelTest extends GridCommonAbstractTest {
+ /** Partition release timeout. */
+ private static final long PART_RELEASE_TIMEOUT = 5000L;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(2);
+
+ IgniteCache<Integer, String> c = grid(0).cache("TEST");
+
+ fillCache(c, 5000);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ QueryEntity ePart = new QueryEntity()
+ .setTableName("TEST")
+ .setKeyType(Integer.class.getName())
+ .setValueType(String.class.getName())
+ .setKeyFieldName("id")
+ .setValueFieldName("val")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("val", String.class.getName(), null);;
+
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(
+ new CacheConfiguration<>(ePart.getTableName())
+ .setAffinity(new RendezvousAffinityFunction(false, 8))
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setQueryEntities(singletonList(ePart))
+ .setSqlSchema("PUBLIC"));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testCancel() throws Exception {
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST",
+ X.EMPTY_OBJECT_ARRAY);
+
+ Iterator<List<?>> it = cursors.get(0).iterator();
+
+ it.next();
+
+ cursors.forEach(QueryCursor::close);
+
+ GridTestUtils.assertThrows(log, () -> {
+ it.next();
+
+ return null;
+ },
+ IgniteSQLException.class, "The query was cancelled while executing"
+ );
+
+ awaitReservationsRelease("TEST");
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testNotOriginatorNodeStop() throws Exception {
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST",
+ X.EMPTY_OBJECT_ARRAY);
+
+ Iterator<List<?>> it = cursors.get(0).iterator();
+
+ it.next();
+
+ stopGrid(1);
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> {
+ while (it.hasNext())
+ it.next();
+
+ return null;
+ },
+ ClusterTopologyCheckedException.class, "Node left"
+ );
+
+ awaitReservationsRelease(grid(0), "TEST");
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testOriginatorNodeStop() throws Exception {
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST",
+ X.EMPTY_OBJECT_ARRAY);
+
+ Iterator<List<?>> it = cursors.get(0).iterator();
+
+ it.next();
+
+ stopGrid(0);
+
+ awaitReservationsRelease(grid(1), "TEST");
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testReadToEnd() throws Exception {
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST WHERE ID < 1",
+ X.EMPTY_OBJECT_ARRAY);
+
+ Iterator<List<?>> it = cursors.get(0).iterator();
+
+ it.next();
+
+ GridTestUtils.assertThrows(log, () -> {
+ it.next();
+
+ return null;
+ },
+ NoSuchElementException.class, null
+ );
+
+ GridTestUtils.assertThrows(log, () -> {
+ it.next();
+
+ return null;
+ },
+ NoSuchElementException.class, null
+ );
+
+ // Checks that all partition are unreserved.
+ awaitReservationsRelease("TEST");
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testFullReadToEnd() throws Exception {
+ QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> cursors =
+ engine.query(null, "PUBLIC",
+ "SELECT * FROM TEST WHERE ID < 1",
+ X.EMPTY_OBJECT_ARRAY);
+
+ cursors.get(0).getAll();
+
+ // Checks that all partition are unreserved.
+ awaitReservationsRelease("TEST");
+ }
+
+ /**
+ * @param c Cache.
+ * @param rows Rows count.
+ */
+ private void fillCache(IgniteCache c, int rows) throws
InterruptedException {
+ c.clear();
+
+ for (int i = 0; i < rows; ++i)
+ c.put(i, "val_" + i);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ *
+ */
+ private void startNewNode() throws Exception {
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * @param cacheName Cache to check
+ * @throws IgniteInterruptedCheckedException
+ */
+ void awaitReservationsRelease(String cacheName) throws
IgniteInterruptedCheckedException {
+ for (Ignite ign : G.allGrids())
+ awaitReservationsRelease((IgniteEx)ign, "TEST");
+ }
+
+ /**
+ * @param node Node to check reservation.
+ * @param cacheName Cache to check reservations.
+ */
+ void awaitReservationsRelease(IgniteEx node, String cacheName) throws
IgniteInterruptedCheckedException {
+ GridDhtAtomicCache c =
GridTestUtils.getFieldValue(node.cachex(cacheName), "delegate");
+
+ List<GridDhtLocalPartition> parts = c.topology().localPartitions();
+
+ GridTestUtils.waitForCondition(() -> {
+ for (GridDhtLocalPartition p : parts) {
+ if (p.reservations() > 0)
+ return false;
+ }
+
+ return true;
+ }, PART_RELEASE_TIMEOUT);
+
+ for (GridDhtLocalPartition p : parts)
+ assertEquals("Partition is reserved: " + p, 0, p.reservations());
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index 7968f0a..aed1f7c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -1323,7 +1323,7 @@ public class PlannerTest extends GridCommonAbstractTest {
exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0,
mailboxRegistry, exchangeSvc,
new TestFailureProcessor(kernal)).go(fragment.root());
- RootNode<Object[]> consumer = new RootNode<>(ectx, r -> {});
+ RootNode<Object[]> consumer = new RootNode<>(ectx);
consumer.register(exec);
//// Remote part
@@ -1568,7 +1568,7 @@ public class PlannerTest extends GridCommonAbstractTest {
exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0,
mailboxRegistry, exchangeSvc,
new TestFailureProcessor(kernal)).go(fragment.root());
- RootNode<Object[]> consumer = new RootNode<>(ectx, r -> {});
+ RootNode<Object[]> consumer = new RootNode<>(ectx);
consumer.register(exec);
//// Remote part
@@ -2780,6 +2780,11 @@ public class PlannerTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override public boolean alive(UUID nodeId) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override protected void prepareMarshal(Message msg) {
// No-op;
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index c3c7f9a..c2c625c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -180,6 +180,11 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override public boolean alive(UUID nodeId) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override protected void prepareMarshal(Message msg) {
// No-op;
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
index cf4f8c5..ae07e61 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
@@ -142,7 +142,7 @@ public class ContinuousExecutionTest extends
AbstractExecutionTest {
inbox.init(ectx, nodes.subList(1, nodes.size()), null);
- RootNode<Object[]> node = new RootNode<>(ectx, r -> {});
+ RootNode<Object[]> node = new RootNode<>(ectx);
node.register(inbox);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 21bd420..086225a 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -96,7 +96,7 @@ public class ExecutionTest extends AbstractExecutionTest {
FilterNode<Object[]> filter = new FilterNode<>(ctx, r -> (Integer)
r[0] >= 2);
filter.register(project);
- RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+ RootNode<Object[]> node = new RootNode<>(ctx);
node.register(filter);
assert node.hasNext();
@@ -140,7 +140,7 @@ public class ExecutionTest extends AbstractExecutionTest {
UnionAllNode<Object[]> union = new UnionAllNode<>(ctx);
union.register(F.asList(scan1, scan2, scan3));
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(union);
assertTrue(root.hasNext());
@@ -193,7 +193,7 @@ public class ExecutionTest extends AbstractExecutionTest {
ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[0], r[1], r[4]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+ RootNode<Object[]> node = new RootNode<>(ctx);
node.register(project);
assert node.hasNext();
@@ -252,7 +252,7 @@ public class ExecutionTest extends AbstractExecutionTest {
ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[2], r[3], r[1]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+ RootNode<Object[]> node = new RootNode<>(ctx);
node.register(project);
assert node.hasNext();
@@ -320,7 +320,7 @@ public class ExecutionTest extends AbstractExecutionTest {
ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[0], r[1], r[4]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+ RootNode<Object[]> node = new RootNode<>(ctx);
node.register(project);
assert node.hasNext();
@@ -371,7 +371,7 @@ public class ExecutionTest extends AbstractExecutionTest {
ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[1]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+ RootNode<Object[]> node = new RootNode<>(ctx);
node.register(project);
assert node.hasNext();
@@ -419,7 +419,7 @@ public class ExecutionTest extends AbstractExecutionTest {
ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[1]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+ RootNode<Object[]> node = new RootNode<>(ctx);
node.register(project);
assert node.hasNext();
@@ -471,7 +471,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(reduce);
assertTrue(root.hasNext());
@@ -517,7 +517,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(reduce);
assertTrue(root.hasNext());
@@ -562,7 +562,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(reduce);
assertTrue(root.hasNext());
@@ -607,7 +607,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(reduce);
assertTrue(root.hasNext());
@@ -652,7 +652,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(reduce);
assertTrue(root.hasNext());
@@ -694,7 +694,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(agg);
assertTrue(root.hasNext());
@@ -738,7 +738,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(agg);
assertTrue(root.hasNext());
@@ -780,7 +780,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(agg);
assertTrue(root.hasNext());
@@ -822,7 +822,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(agg);
assertTrue(root.hasNext());
@@ -864,7 +864,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(agg);
assertTrue(root.hasNext());
@@ -907,7 +907,7 @@ public class ExecutionTest extends AbstractExecutionTest {
AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+ RootNode<Object[]> root = new RootNode<>(ctx);
root.register(agg);
assertTrue(root.hasNext());
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 26b66c9..912d8dc 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
+import org.apache.ignite.internal.processors.query.calcite.CancelTest;
import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
@@ -40,7 +41,8 @@ import org.junit.runners.Suite;
ContinuousExecutionTest.class,
CalciteQueryProcessorTest.class,
JdbcQueryTest.class,
- CalciteBasicSecondaryIndexIntegrationTest.class
+ CalciteBasicSecondaryIndexIntegrationTest.class,
+ CancelTest.class
})
public class IgniteCalciteTestSuite {
}