This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new bff7ad8 IGNITE-13198: Calcite integration. Rework error / cancel logic at execution. This closes #7981 bff7ad8 is described below commit bff7ad8d6aa36e1381b9e83be456c47a65d992e1 Author: Taras Ledkov <tled...@gridgain.com> AuthorDate: Tue Aug 25 22:00:27 2020 +0300 IGNITE-13198: Calcite integration. Rework error / cancel logic at execution. This closes #7981 --- .../query/calcite/exec/ExchangeService.java | 27 +- .../query/calcite/exec/ExchangeServiceImpl.java | 63 ++++- .../query/calcite/exec/ExecutionContext.java | 18 -- .../query/calcite/exec/ExecutionServiceImpl.java | 289 +++++++-------------- .../processors/query/calcite/exec/IndexScan.java | 256 +++++++++--------- .../query/calcite/exec/LogicalRelImplementor.java | 4 +- .../query/calcite/exec/MailboxRegistry.java | 12 +- .../query/calcite/exec/MailboxRegistryImpl.java | 91 +++++-- .../processors/query/calcite/exec/TableScan.java | 251 ------------------ .../query/calcite/exec/rel/AbstractJoinNode.java | 15 ++ .../query/calcite/exec/rel/AbstractNode.java | 23 +- .../query/calcite/exec/rel/AggregateNode.java | 9 + .../query/calcite/exec/rel/FilterNode.java | 9 + .../processors/query/calcite/exec/rel/Inbox.java | 122 ++++++--- .../MessageService.java => exec/rel/Mailbox.java} | 32 ++- .../query/calcite/exec/rel/ModifyNode.java | 9 + .../processors/query/calcite/exec/rel/Node.java | 7 +- .../processors/query/calcite/exec/rel/Outbox.java | 93 ++++--- .../query/calcite/exec/rel/ProjectNode.java | 9 + .../query/calcite/exec/rel/RootNode.java | 117 +++++---- .../query/calcite/exec/rel/ScanNode.java | 45 +++- .../query/calcite/exec/rel/SortNode.java | 43 ++- .../{InboxCancelMessage.java => ErrorMessage.java} | 86 +++--- ...oxCancelMessage.java => InboxCloseMessage.java} | 57 ++-- .../query/calcite/message/MessageService.java | 19 ++ .../query/calcite/message/MessageServiceImpl.java | 23 +- .../query/calcite/message/MessageType.java | 8 +- ...yCancelRequest.java => OutboxCloseMessage.java} | 64 ++++- .../query/calcite/message/QueryStartRequest.java | 13 +- .../RemoteException.java} | 52 +++- .../processors/query/calcite/prepare/Cloner.java | 1 + .../processors/query/calcite/util/Commons.java | 14 - .../processors/query/calcite/CancelTest.java | 277 ++++++++++++++++++++ .../processors/query/calcite/PlannerTest.java | 9 +- .../calcite/exec/rel/AbstractExecutionTest.java | 5 + .../calcite/exec/rel/ContinuousExecutionTest.java | 2 +- .../query/calcite/exec/rel/ExecutionTest.java | 36 +-- .../ignite/testsuites/IgniteCalciteTestSuite.java | 4 +- 38 files changed, 1224 insertions(+), 990 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java index 731023b..ac1f58f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java @@ -56,7 +56,30 @@ public interface ExchangeService extends Service { * @param qryId Query ID. * @param fragmentId Target fragment ID. * @param exchangeId Exchange ID. - * @param batchId Batch ID. */ - void cancel(UUID nodeId, UUID qryId, long fragmentId, long exchangeId, int batchId) throws IgniteCheckedException; + void closeInbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException; + + /** + * Sends cancel request. + * @param nodeId Target node ID. + * @param qryId Query ID. + * @param fragmentId Target fragment ID. + * @param exchangeId Exchange ID. + */ + void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException; + + /** + * @param nodeId Target node ID. + * @param qryId Query ID. + * @param fragmentId Source fragment ID. + * @param err Exception to send. + * @throws IgniteCheckedException On error marshaling or send ErrorMessage. + */ + void sendError(UUID nodeId, UUID qryId, long fragmentId, Throwable err) throws IgniteCheckedException; + + /** + * @param nodeId Node ID. + * @return {@code true} if node is alive, {@code false} otherwise. + */ + boolean alive(UUID nodeId); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java index 4ab6cb6..fc2879b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec; +import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -27,15 +28,18 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox; -import org.apache.ignite.internal.processors.query.calcite.message.InboxCancelMessage; +import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage; +import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage; import org.apache.ignite.internal.processors.query.calcite.message.MessageService; import org.apache.ignite.internal.processors.query.calcite.message.MessageType; +import org.apache.ignite.internal.processors.query.calcite.message.OutboxCloseMessage; import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage; import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage; import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription; import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext; import org.apache.ignite.internal.processors.query.calcite.util.AbstractService; import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; /** * @@ -111,8 +115,18 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ } /** {@inheritDoc} */ - @Override public void cancel(UUID nodeId, UUID qryId, long fragmentId, long exchangeId, int batchId) throws IgniteCheckedException { - messageService().send(nodeId, new InboxCancelMessage(qryId, fragmentId, exchangeId, batchId)); + @Override public void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException { + messageService().send(nodeId, new OutboxCloseMessage(qryId, fragmentId, exchangeId)); + } + + /** {@inheritDoc} */ + @Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException { + messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId, exchangeId)); + } + + /** {@inheritDoc} */ + @Override public void sendError(UUID nodeId, UUID qryId, long fragmentId, Throwable err) throws IgniteCheckedException { + messageService().send(nodeId, new ErrorMessage(qryId, fragmentId, err)); } /** {@inheritDoc} */ @@ -129,24 +143,46 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ /** {@inheritDoc} */ @Override public void init() { - messageService().register((n, m) -> onMessage(n, (InboxCancelMessage) m), MessageType.QUERY_INBOX_CANCEL_MESSAGE); + messageService().register((n, m) -> onMessage(n, (InboxCloseMessage) m), MessageType.QUERY_INBOX_CANCEL_MESSAGE); + messageService().register((n, m) -> onMessage(n, (OutboxCloseMessage) m), MessageType.QUERY_OUTBOX_CANCEL_MESSAGE); messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage) m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE); messageService().register((n, m) -> onMessage(n, (QueryBatchMessage) m), MessageType.QUERY_BATCH_MESSAGE); } + /** {@inheritDoc} */ + @Override public boolean alive(UUID nodeId) { + return messageService().alive(nodeId); + } + /** */ - protected void onMessage(UUID nodeId, InboxCancelMessage msg) { - Inbox<?> inbox = mailboxRegistry().inbox(msg.queryId(), msg.exchangeId()); + protected void onMessage(UUID nodeId, InboxCloseMessage msg) { + Collection<Inbox<?>> inboxes = mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId()); + if (!F.isEmpty(inboxes)) { + for (Inbox<?> inbox : inboxes) + inbox.context().execute(inbox::close); + } + else if (log.isDebugEnabled()) { + log.debug("Stale inbox cancel message received: [" + + "nodeId=" + nodeId + ", " + + "queryId=" + msg.queryId() + ", " + + "fragmentId=" + msg.fragmentId() + ", " + + "exchangeId=" + msg.exchangeId() + "]"); + } + } - if (inbox != null) - inbox.cancel(); + /** */ + protected void onMessage(UUID nodeId, OutboxCloseMessage msg) { + Collection<Outbox<?>> outboxes = mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId()); + if (!F.isEmpty(outboxes)) { + for (Outbox<?> outbox : outboxes) + outbox.context().execute(outbox::close); + } else if (log.isDebugEnabled()) { - log.debug("Stale cancel message received: [" + + log.debug("Stale oubox cancel message received: [" + "nodeId=" + nodeId + ", " + "queryId=" + msg.queryId() + ", " + "fragmentId=" + msg.fragmentId() + ", " + - "exchangeId=" + msg.exchangeId() + ", " + - "batchId=" + msg.batchId() + "]"); + "exchangeId=" + msg.exchangeId() + "]"); } } @@ -173,7 +209,7 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ if (inbox == null && msg.batchId() == 0) { // first message sent before a fragment is built // note that an inbox source fragment id is also used as an exchange id - Inbox<?> newInbox = new Inbox<>(baseInboxContext(msg.queryId(), msg.fragmentId()), + Inbox<?> newInbox = new Inbox<>(baseInboxContext(nodeId, msg.queryId(), msg.fragmentId()), this, mailboxRegistry(), msg.exchangeId(), msg.exchangeId()); inbox = mailboxRegistry().register(newInbox); @@ -194,10 +230,11 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ /** * @return Minimal execution context to meet Inbox needs. */ - private ExecutionContext<?> baseInboxContext(UUID qryId, long fragmentId) { + private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragmentId) { return new ExecutionContext<>( taskExecutor(), PlanningContext.builder() + .originatingNodeId(nodeId) .logger(log) .build(), qryId, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java index 70d52b0..4e7cdd5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java @@ -57,9 +57,6 @@ public class ExecutionContext<Row> implements DataContext { /** */ private final ExpressionFactory<Row> expressionFactory; - /** */ - private volatile boolean cancelled; - /** * @param ctx Parent context. * @param qryId Query ID. @@ -150,13 +147,6 @@ public class ExecutionContext<Row> implements DataContext { } /** - * @return Cancelled flag. - */ - public boolean cancelled() { - return cancelled; - } - - /** * @return Handler to access row fields. */ public RowHandler<Row> rowHandler() { @@ -198,14 +188,6 @@ public class ExecutionContext<Row> implements DataContext { } /** - * Sets cancelled flag. - */ - public void markCancelled() { - if (!cancelled) - cancelled = true; - } - - /** * Executes a query task. * * @param task Query task. diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 4826dbe..aea46c7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -27,7 +27,6 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.ConventionTraitDef; @@ -49,7 +48,6 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.ValidationException; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.events.EventType; @@ -68,17 +66,19 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryCancellable; import org.apache.ignite.internal.processors.query.QueryContext; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox; import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode; +import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage; import org.apache.ignite.internal.processors.query.calcite.message.MessageService; import org.apache.ignite.internal.processors.query.calcite.message.MessageType; -import org.apache.ignite.internal.processors.query.calcite.message.QueryCancelRequest; import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest; import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService; +import org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException; import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey; import org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryFieldMetadata; import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan; @@ -399,13 +399,10 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut /** {@inheritDoc} */ @Override public void cancelQuery(UUID qryId) { - mailboxRegistry().outboxes(qryId).forEach(this::executeCancel); - mailboxRegistry().inboxes(qryId).forEach(this::executeCancel); - QueryInfo info = running.get(qryId); if (info != null) - info.cancel(); + info.doCancel(); } /** {@inheritDoc} */ @@ -435,7 +432,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut @Override public void init() { messageService().register((n,m) -> onMessage(n, (QueryStartRequest) m), MessageType.QUERY_START_REQUEST); messageService().register((n,m) -> onMessage(n, (QueryStartResponse) m), MessageType.QUERY_START_RESPONSE); - messageService().register((n,m) -> onMessage(n, (QueryCancelRequest) m), MessageType.QUERY_CANCEL_REQUEST); + messageService().register((n,m) -> onMessage(n, (ErrorMessage) m), MessageType.QUERY_ERROR_MESSAGE); eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); @@ -722,47 +719,38 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut register(info); // start remote execution - if (fragments.size() > 1) { - for (int i = 1; i < fragments.size(); i++) { - Fragment fragment0 = fragments.get(i); - NodesMapping mapping0 = plan.fragmentMapping(fragment0); - - boolean error = false; - - for (UUID nodeId : mapping0.nodes()) { - if (error) - info.onResponse(nodeId, fragment0.fragmentId(), new QueryCancelledException()); - else { - try { - FragmentDescription fragmentDesc0 = new FragmentDescription( - fragment0.fragmentId(), - mapping0.partitions(nodeId), - mapping0.assignments().size(), - plan.targetMapping(fragment0), - plan.remoteSources(fragment0) - ); - - QueryStartRequest req = new QueryStartRequest( - qryId, - pctx.schemaName(), - toJson(fragment0.root()), - pctx.topologyVersion(), - fragmentDesc0, - pctx.parameters()); - - messageService().send(nodeId, req); - } - catch (Exception e) { - info.onResponse(nodeId, fragment0.fragmentId(), e); - error = true; - } + for (int i = 1; i < fragments.size(); i++) { + Fragment fragment0 = fragments.get(i); + NodesMapping mapping0 = plan.fragmentMapping(fragment0); + + boolean error = false; + for (UUID nodeId : mapping0.nodes()) { + if (error) + info.onResponse(nodeId, fragment0.fragmentId(), new QueryCancelledException()); + else { + try { + FragmentDescription fragmentDesc0 = new FragmentDescription( + fragment0.fragmentId(), + mapping0.partitions(nodeId), + mapping0.assignments().size(), + plan.targetMapping(fragment0), + plan.remoteSources(fragment0) + ); + + QueryStartRequest req = new QueryStartRequest( + qryId, + pctx.schemaName(), + toJson(fragment0.root()), + pctx.topologyVersion(), + fragmentDesc0, + pctx.parameters()); + + messageService().send(nodeId, req); + } + catch (Exception e) { + info.onResponse(nodeId, fragment0.fragmentId(), e); + error = true; } - } - - if (error) { - info.awaitAllReplies(); - - throw new AssertionError(); // Previous call must throw an exception } } } @@ -844,58 +832,49 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut assert nodeId != null && msg != null; PlanningContext ctx = createContext(msg.schema(), nodeId, msg.topologyVersion()); + ExecutionContext<Row> execCtx = new ExecutionContext<>(taskExecutor(), ctx, msg.queryId(), + msg.fragmentDescription(), handler, Commons.parametersMap(msg.parameters())); + Outbox<Row> node; try { - ExecutionContext<Row> execCtx = new ExecutionContext<>( - taskExecutor(), - ctx, - msg.queryId(), - msg.fragmentDescription(), - handler, - Commons.parametersMap(msg.parameters()) - ); - - Node<Row> node = new LogicalRelImplementor<>( + node = new LogicalRelImplementor<>( execCtx, partitionService(), mailboxRegistry(), exchangeService(), failureProcessor()) .go(fromJson(ctx, msg.root())); - - assert node instanceof Outbox : node; - - node.context().execute(((Outbox<Row>) node)::init); - - messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId())); } - catch (Throwable ex) { // TODO don't catch errors! - cancelQuery(msg.queryId()); - - if (ex instanceof ClusterTopologyCheckedException) - return; + catch (Exception ex) { + U.error(log, "Failed to build execution tree. ", ex); - U.warn(log, "Failed to start query. [nodeId=" + nodeId + ']', ex); + mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1) + .forEach(Outbox::close); + mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1) + .forEach(Inbox::close); try { - messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId(), ex)); + messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex)); } catch (IgniteCheckedException e) { - e.addSuppressed(ex); - U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e); } - if (ex instanceof Error) - throw (Error)ex; + return; } - } - /** */ - private void onMessage(UUID nodeId, QueryCancelRequest msg) { - assert nodeId != null && msg != null; + try { + messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId())); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e); + + node.onNodeLeft(nodeId); + + return; + } - cancelQuery(msg.queryId()); + node.init(); } /** */ @@ -909,37 +888,18 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut } /** */ - private void onCursorClose(RootNode<?> rootNode) { - switch (rootNode.state()) { - case CANCELLED: - cancelQuery(rootNode.queryId()); + private void onMessage(UUID nodeId, ErrorMessage msg) { + assert nodeId != null && msg != null; - break; - case END: - running.remove(rootNode.queryId()); + QueryInfo info = running.get(msg.queryId()); - break; - default: - throw new AssertionError(); - } + if (info != null) + info.onError(new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error())); } /** */ private void onNodeLeft(UUID nodeId) { running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(nodeId)); - - final Predicate<Node<?>> p = new OriginatingFilter(nodeId); - - mailboxRegistry().outboxes(null).stream() - .filter(p).forEach(this::executeCancel); - - mailboxRegistry().inboxes(null).stream() - .filter(p).forEach(this::executeCancel); - } - - /** */ - private void executeCancel(Node<?> node) { - node.context().execute(node::cancel); } /** */ @@ -948,10 +908,10 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut RUNNING, /** */ - CANCELLING, + CLOSING, /** */ - CANCELLED + CLOSED } /** */ @@ -1009,13 +969,10 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut private QueryState state; /** */ - private Throwable error; - - /** */ private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan, Node<Row> root) { this.ctx = ctx; - RootNode<Row> rootNode = new RootNode<>(ctx, ExecutionServiceImpl.this::onCursorClose); + RootNode<Row> rootNode = new RootNode<>(ctx, this::tryClose); rootNode.register(root); this.root = rootNode; @@ -1043,71 +1000,43 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut /** {@inheritDoc} */ @Override public void doCancel() { - cancel(); - } - - /** */ - private void awaitAllReplies() { - Throwable error; - - try { - synchronized (this) { - while (!waiting.isEmpty()) - wait(); - - error = this.error; - } - } - catch (InterruptedException e) { - throw new IgniteInterruptedException(e); - } - - if (error != null) - throw new IgniteSQLException("Failed to execute query.", error); + root.close(); } - /** */ - private void cancel() { - boolean cancelLoc = false; - boolean cancelRemote = false; + /** + * Can be called multiple times after receive each error at {@link #onResponse(RemoteFragmentKey, Throwable)}. + */ + private void tryClose() { QueryState state0 = null; synchronized (this) { - if (state == QueryState.CANCELLED) + if (state == QueryState.CLOSED) return; - if (state == QueryState.RUNNING) { - cancelLoc = true; - state0 = state = QueryState.CANCELLING; - } + if (state == QueryState.RUNNING) + state0 = state = QueryState.CLOSING; - if (state == QueryState.CANCELLING && waiting.isEmpty()) { - cancelRemote = true; - state0 = state = QueryState.CANCELLED; - } + if (state == QueryState.CLOSING && waiting.isEmpty()) + state0 = state = QueryState.CLOSED; } - if (cancelLoc) - root.cancel(); + if (state0 == QueryState.CLOSED) { + // 1) unregister runing query + running.remove(ctx.queryId()); - if (cancelRemote) { - QueryCancelRequest msg = new QueryCancelRequest(ctx.queryId()); + // 2) close local fragment + root.proceedClose(); - for (UUID remote : remotes) { + // 3) close remote fragments + for (UUID nodeId : remotes) { try { - messageService().send(remote, msg); - } - catch (ClusterTopologyCheckedException e) { - U.warn(log, e.getMessage(), e); + exchangeService().closeOutbox(nodeId, ctx.queryId(), -1, -1); } catch (IgniteCheckedException e) { - throw U.convertException(e); + U.warn(log, "Failed to send cancel message. [nodeId=" + nodeId + ']', e); } } } - - if (state0 == QueryState.CANCELLED) - running.remove(ctx.queryId()); } /** */ @@ -1127,7 +1056,8 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut } if (!F.isEmpty(fragments)) { - ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("Failed to start query, node left. nodeId=" + nodeId); + ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException( + "Failed to start query, node left. nodeId=" + nodeId); for (RemoteFragmentKey fragment : fragments) onResponse(fragment, ex); @@ -1141,46 +1071,23 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut /** */ private void onResponse(RemoteFragmentKey fragment, Throwable error) { - boolean cancel; - + QueryState state; synchronized (this) { - if (!waiting.remove(fragment)) - return; - - if (error != null) { - if (this.error != null) - this.error.addSuppressed(error); - else - this.error = error; - } - - boolean empty = waiting.isEmpty(); - - cancel = empty && this.error != null; - - if (empty) - notifyAll(); + waiting.remove(fragment); + state = this.state; } - if (cancel) - cancel(); + if (error != null) + onError(error); + else if (state == QueryState.CLOSING) + tryClose(); } - } - - /** */ - private static final class OriginatingFilter implements Predicate<Node<?>> { - /** */ - private final UUID nodeId; /** */ - private OriginatingFilter(UUID nodeId) { - this.nodeId = nodeId; - } + private void onError(Throwable error) { + root.onError(error); - /** {@inheritDoc} */ - @Override public boolean test(Node node) { - // Uninitialized inbox doesn't know originating node ID. - return Objects.equals(node.context().originatingNodeId(), nodeId); + tryClose(); } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java index a6d720f..e15a8b6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java @@ -43,30 +43,32 @@ import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex; import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.database.H2TreeFilterClosure; +import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRow; import org.apache.ignite.internal.processors.query.h2.opt.H2Row; -import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.lang.GridIteratorAdapter; import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl; import org.h2.value.DataType; import org.h2.value.Value; +import org.jetbrains.annotations.NotNull; /** * Scan on index. */ -public class IndexScan<Row> implements Iterable<Row> { +public class IndexScan<Row> implements Iterable<Row>, AutoCloseable { /** */ - private final ExecutionContext<Row> ectx; + private final GridKernalContext kctx; /** */ - private final CacheObjectContext coCtx; + private final GridCacheContext<?, ?> cctx; /** */ - private final GridKernalContext ctx; + private final ExecutionContext<Row> ectx; /** */ - private final GridCacheContext<?, ?> cacheCtx; + private final CacheObjectContext coCtx; /** */ private final TableDescriptor desc; @@ -96,154 +98,172 @@ public class IndexScan<Row> implements Iterable<Row> { private final MvccSnapshot mvccSnapshot; /** */ - private List<GridDhtLocalPartition> partsToReserve; + private List<GridDhtLocalPartition> reserved; /** - * @param ctx Cache context. + * @param ectx Execution context. * @param igniteIdx Index tree. * @param filters Additional filters. * @param lowerBound Lower index scan bound. * @param upperBound Upper index scan bound. */ public IndexScan( - ExecutionContext<Row> ctx, + ExecutionContext<Row> ectx, IgniteIndex igniteIdx, Predicate<Row> filters, Row lowerBound, Row upperBound ) { - ectx = ctx; + this.ectx = ectx; desc = igniteIdx.table().descriptor(); + cctx = desc.cacheContext(); + kctx = cctx.kernalContext(); + coCtx = cctx.cacheObjectContext(); + + RelDataType rowType = desc.selectRowType(this.ectx.getTypeFactory()); + + factory = this.ectx.rowHandler().factory(this.ectx.getTypeFactory(), rowType); idx = igniteIdx.index(); + topVer = ectx.planningContext().topologyVersion(); this.filters = filters; this.lowerBound = lowerBound; this.upperBound = upperBound; - cacheCtx = igniteIdx.table().descriptor().cacheContext(); - coCtx = cacheCtx.cacheObjectContext(); - this.ctx = coCtx.kernalContext(); - topVer = ctx.planningContext().topologyVersion(); - partsArr = ctx.partitions(); - mvccSnapshot = ctx.mvccSnapshot(); - - RelDataType rowType = desc.selectRowType(ectx.getTypeFactory()); - factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType); + partsArr = ectx.partitions(); + mvccSnapshot = ectx.mvccSnapshot(); } /** {@inheritDoc} */ - @Override public Iterator<Row> iterator() { - H2TreeFilterClosure filterC = filterClosure(); + @Override public synchronized Iterator<Row> iterator() { + reserve(); + try { + H2Row lower = lowerBound == null ? null : new H2PlainRow(values(coCtx, ectx, lowerBound)); + H2Row upper = upperBound == null ? null : new H2PlainRow(values(coCtx, ectx, upperBound)); - H2Row lower = lowerBound == null ? null : new CalciteH2Row<>(coCtx, ectx, lowerBound); - H2Row upper = upperBound == null ? null : new CalciteH2Row<>(coCtx, ectx, upperBound); - - reservePartitions(); - - GridCursor<H2Row> cur = idx.find(lower, upper, filterC); + return new IteratorImpl(idx.find(lower, upper, filterClosure())); + } + catch (Exception e) { + release(); - return new CursorIteratorWrapper(cur); + throw e; + } } /** */ - public H2TreeFilterClosure filterClosure() { - IndexingQueryFilter filter = new IndexingQueryFilterImpl(ctx, topVer, partsArr); - IndexingQueryCacheFilter f = filter.forCache(cacheCtx.name()); - H2TreeFilterClosure filterC = null; - - if (f != null || mvccSnapshot != null ) - filterC = new H2TreeFilterClosure(f, mvccSnapshot, cacheCtx, ectx.planningContext().logger()); - - return filterC; + @Override public void close() { + release(); } /** */ - private void reservePartitions() { - assert partsToReserve == null : partsToReserve; + private synchronized void reserve() { + if (reserved != null) + return; - partsToReserve = gatherPartitions(cacheCtx, partsArr); + List<GridDhtLocalPartition> toReserve; - for (GridDhtLocalPartition part : partsToReserve) { - if (part == null || !part.reserve()) - throw reservationException(); - else if (part.state() != GridDhtPartitionState.OWNING) { - part.release(); + if (cctx.isReplicated()) { + int partsCnt = cctx.affinity().partitions(); + toReserve = new ArrayList<>(partsCnt); + GridDhtPartitionTopology top = cctx.topology(); + for (int i = 0; i < partsCnt; i++) + toReserve.add(top.localPartition(i)); + } + else if (cctx.isPartitioned()) { + assert partsArr != null; - throw reservationException(); - } + toReserve = new ArrayList<>(partsArr.length); + GridDhtPartitionTopology top = cctx.topology(); + for (int i = 0; i < partsArr.length; i++) + toReserve.add(top.localPartition(partsArr[i])); } - } + else { + assert cctx.isLocal(); - /** */ - private List<GridDhtLocalPartition> gatherPartitions(GridCacheContext<?, ?> ctx, int[] arr ) { - if (ctx.isReplicated()) { - int partsCnt = ctx.affinity().partitions(); - GridDhtPartitionTopology top = ctx.topology(); - List<GridDhtLocalPartition> parts = new ArrayList<>(partsCnt); + toReserve = Collections.emptyList(); + } - for (int i = 0; i < partsCnt; i++) - parts.add(top.localPartition(i)); + reserved = new ArrayList<>(toReserve.size()); - return parts; - } - else if (ctx.isPartitioned()) { - assert arr != null; - List<GridDhtLocalPartition> parts = new ArrayList<>(arr.length); - GridDhtPartitionTopology top = ctx.topology(); + try { + for (GridDhtLocalPartition part : toReserve) { + if (part == null || !part.reserve()) + throw new IgniteSQLException("Failed to reserve partition for query execution. Retry on stable topology."); + else if (part.state() != GridDhtPartitionState.OWNING) { + part.release(); - for (int i = 0; i < arr.length; i++) - parts.add(top.localPartition(arr[i])); + throw new IgniteSQLException("Failed to reserve partition for query execution. Retry on stable topology."); + } - return parts; + reserved.add(part); + } } - else { - assert ctx.isLocal(); + catch (Exception e) { + release(); - return Collections.emptyList(); + throw e; } } /** */ - private void releasePartitions() { - assert partsToReserve != null; + private synchronized void release() { + if (reserved == null) + return; - for (GridDhtLocalPartition part : partsToReserve) + for (GridDhtLocalPartition part : reserved) part.release(); + + reserved = null; } /** */ - private IgniteSQLException reservationException() { - return new IgniteSQLException("Failed to reserve partition for query execution. Retry on stable topology."); + private H2TreeFilterClosure filterClosure() { + IndexingQueryFilter filter = new IndexingQueryFilterImpl(kctx, topVer, partsArr); + IndexingQueryCacheFilter f = filter.forCache(cctx.name()); + H2TreeFilterClosure filterC = null; + + if (f != null || mvccSnapshot != null ) + filterC = new H2TreeFilterClosure(f, mvccSnapshot, cctx, ectx.planningContext().logger()); + + return filterC; } /** */ - private class CursorIteratorWrapper extends GridCloseableIteratorAdapter<Row> { + private Value[] values(CacheObjectValueContext cctx, ExecutionContext<Row> ectx, Row row) { + try { + RowHandler<Row> rowHnd = ectx.rowHandler(); + int rowLen = rowHnd.columnCount(row); + + Value[] values = new Value[rowLen]; + for (int i = 0; i < rowLen; i++) { + Object o = rowHnd.get(i, row); + + if (o != null) + values[i] = H2Utils.wrap(cctx, o, DataType.getTypeFromClass(o.getClass())); + } + + return values; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wrap object into H2 Value.", e); + } + } + + /** */ + private class IteratorImpl extends GridIteratorAdapter<Row> { /** */ private final GridCursor<H2Row> cursor; /** Next element. */ private Row next; - /** - * @param cursor Cursor. - */ - CursorIteratorWrapper(GridCursor<H2Row> cursor) { - assert cursor != null; + /** */ + public IteratorImpl(@NotNull GridCursor<H2Row> cursor) { this.cursor = cursor; } /** {@inheritDoc} */ - @Override protected Row onNext() { - if (next == null) - throw new NoSuchElementException(); - - Row res = next; - - next = null; - - return res; - } + @Override public boolean hasNextX() throws IgniteCheckedException { + assert cursor != null; - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { if (next != null) return true; @@ -259,58 +279,22 @@ public class IndexScan<Row> implements Iterable<Row> { } /** {@inheritDoc} */ - @Override protected void onClose() { - releasePartitions(); - } - } - - /** */ - private static class CalciteH2Row<Row> extends H2Row { - /** */ - private final Value[] values; - - /** */ - CalciteH2Row(CacheObjectValueContext coCtx, ExecutionContext<Row> ctx, Row row) { - try { - RowHandler<Row> rowHnd = ctx.rowHandler(); - - int colCnt = rowHnd.columnCount(row); - - values = new Value[colCnt]; - - for (int i = 0; i < colCnt; i++) { - Object o = rowHnd.get(i, row); - - if (o != null) { - Value v = H2Utils.wrap(coCtx, o, DataType.getTypeFromClass(o.getClass())); + @Override public Row nextX() { + assert cursor != null; - values[i] = v; - } - } - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to wrap object into H2 Value.", e); - } - } + if (next == null) + throw new NoSuchElementException(); - /** {@inheritDoc} */ - @Override public boolean indexSearchRow() { - return true; - } + Row res = next; - /** {@inheritDoc} */ - @Override public int getColumnCount() { - return values.length; - } + next = null; - /** {@inheritDoc} */ - @Override public Value getValue(int idx) { - return values[idx]; + return res; } /** {@inheritDoc} */ - @Override public void setValue(int idx, Value v) { - throw new AssertionError("Not supported."); + @Override public void removeX() { + throw new UnsupportedOperationException("Remove is not supported."); } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java index bb4297a..8b9ae84 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java @@ -400,7 +400,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { } /** */ - public Node<Row> go(IgniteRel rel) { - return visit(rel); + public <T extends Node<Row>> T go(IgniteRel rel) { + return (T)visit(rel); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java index d49009d..8c7c2d4 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java @@ -81,16 +81,20 @@ public interface MailboxRegistry extends Service { /** * Returns all registered inboxes for provided query ID. * - * @param qryId Query ID. {@code null} means return all registered inboxes. + * @param qryId Query ID. {@code null} means return inboxes with any query id. + * @param fragmentId Fragment Id. {@code -1} means return inboxes with any fragment id. + * @param exchangeId Exchange Id. {@code -1} means return inboxes with any exchange id. * @return Registered inboxes. */ - Collection<Inbox<?>> inboxes(@Nullable UUID qryId); + Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long fragmentId, long exchangeId); /** * Returns all registered outboxes for provided query ID. * - * @param qryId Query ID. {@code null} means return all registered outboxes. + * @param qryId Query ID. {@code null} means return outboxes with any query id. + * @param fragmentId Fragment Id. {@code -1} means return outboxes with any fragment id. + * @param exchangeId Exchange Id. {@code -1} means return outboxes with any exchange id. * @return Registered outboxes. */ - Collection<Outbox<?>> outboxes(@Nullable UUID qryId); + Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long exchangeId); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java index 77e1101..39ace60 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java @@ -19,12 +19,18 @@ package org.apache.ignite.internal.processors.query.calcite.exec; import java.util.Collection; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.Mailbox; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox; import org.apache.ignite.internal.processors.query.calcite.util.AbstractService; import org.jetbrains.annotations.Nullable; @@ -34,11 +40,20 @@ import org.jetbrains.annotations.Nullable; */ public class MailboxRegistryImpl extends AbstractService implements MailboxRegistry { /** */ + private static final Predicate<Mailbox<?>> ALWAYS_TRUE = o -> true; + + /** */ private final Map<MailboxKey, Outbox<?>> locals; /** */ private final Map<MailboxKey, Inbox<?>> remotes; + /** */ + private final DiscoveryEventListener discoLsnr; + + /** */ + private GridEventStorageManager evtMgr; + /** * @param ctx Kernal. */ @@ -47,6 +62,25 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis locals = new ConcurrentHashMap<>(); remotes = new ConcurrentHashMap<>(); + + discoLsnr = (e, c) -> onNodeLeft(e.eventNode().id()); + } + + /** {@inheritDoc} */ + @Override public void onStart(GridKernalContext ctx) { + eventManager(ctx.event()); + + init(); + } + + /** {@inheritDoc} */ + @Override public void init() { + eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); + } + + /** {@inheritDoc} */ + @Override public void tearDown() { + eventManager().removeDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); } /** {@inheritDoc} */ @@ -58,7 +92,7 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis /** {@inheritDoc} */ @Override public void unregister(Inbox<?> inbox) { - remotes.remove(new MailboxKey(inbox.queryId(), inbox.exchangeId())); + remotes.remove(new MailboxKey(inbox.queryId(), inbox.exchangeId()), inbox); } /** {@inheritDoc} */ @@ -70,7 +104,7 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis /** {@inheritDoc} */ @Override public void unregister(Outbox<?> outbox) { - locals.remove(new MailboxKey(outbox.queryId(), outbox.exchangeId())); + locals.remove(new MailboxKey(outbox.queryId(), outbox.exchangeId()), outbox); } /** {@inheritDoc} */ @@ -84,27 +118,52 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis } /** {@inheritDoc} */ - @Override public Collection<Inbox<?>> inboxes(@Nullable UUID qryId) { - if (qryId == null) - return remotes.values(); - - return remotes.entrySet().stream() - .filter(e -> e.getKey().qryId.equals(qryId)) - .map(Map.Entry::getValue) + @Override public Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long fragmentId, long exchangeId) { + return remotes.values().stream() + .filter(makeFilter(qryId, fragmentId, exchangeId)) .collect(Collectors.toList()); } /** {@inheritDoc} */ - @Override public Collection<Outbox<?>> outboxes(@Nullable UUID qryId) { - if (qryId == null) - return locals.values(); - - return locals.entrySet().stream() - .filter(e -> e.getKey().qryId.equals(qryId)) - .map(Map.Entry::getValue) + @Override public Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long exchangeId) { + return locals.values().stream() + .filter(makeFilter(qryId, fragmentId, exchangeId)) .collect(Collectors.toList()); } + /** + * @param evtMgr Event manager. + */ + public void eventManager(GridEventStorageManager evtMgr) { + this.evtMgr = evtMgr; + } + + /** + * @return Event manager. + */ + public GridEventStorageManager eventManager() { + return evtMgr; + } + + /** */ + private void onNodeLeft(UUID nodeId) { + locals.values().forEach(n -> n.onNodeLeft(nodeId)); + remotes.values().forEach(n -> n.onNodeLeft(nodeId)); + } + + /** */ + private static Predicate<Mailbox<?>> makeFilter(@Nullable UUID qryId, long fragmentId, long exchangeId) { + Predicate<Mailbox<?>> filter = ALWAYS_TRUE; + if (qryId != null) + filter = filter.and(mailbox -> Objects.equals(mailbox.queryId(), qryId)); + if (fragmentId != -1) + filter = filter.and(mailbox -> mailbox.fragmentId() == fragmentId); + if (exchangeId != -1) + filter = filter.and(mailbox -> mailbox.exchangeId() == exchangeId); + + return filter; + } + /** */ private static class MailboxKey { /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java deleted file mode 100644 index 090abc7..0000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.exec; - -import java.util.ArrayDeque; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Queue; - -import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheStoppedException; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory; -import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor; -import org.apache.ignite.internal.processors.query.calcite.util.Commons; -import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; -import org.apache.ignite.internal.util.lang.GridCursor; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** */ -public class TableScan<Row> implements Iterable<Row> { - /** */ - private final ExecutionContext<Row> ectx; - - /** */ - private final TableDescriptor desc; - - /** */ - private final RowFactory<Row> factory; - - /** */ - public TableScan(ExecutionContext<Row> ectx, TableDescriptor desc) { - this.ectx = ectx; - this.desc = desc; - - RelDataType rowType = desc.selectRowType(ectx.getTypeFactory()); - factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType); - } - - /** {@inheritDoc} */ - @Override public Iterator<Row> iterator() { - try { - return new IteratorImpl().init(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * Table scan iterator. - */ - private class IteratorImpl extends GridCloseableIteratorAdapter<Row> { - /** */ - private int cacheId; - - /** */ - private Queue<GridDhtLocalPartition> parts; - - /** */ - private GridDhtLocalPartition part; - - /** */ - private GridCursor<? extends CacheDataRow> cur; - - /** */ - private Row next; - - /** {@inheritDoc} */ - @Override protected Row onNext() { - if (next == null) - throw new NoSuchElementException(); - - Row next = this.next; - - this.next = null; - - return next; - } - - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - assert parts != null; - - if (next != null) - return true; - - while (true) { - if (cur == null) { - if ((part = parts.poll()) == null) - break; - - cur = part.dataStore().cursor(cacheId, ectx.mvccSnapshot()); - } - - if (cur.next()) { - CacheDataRow row = cur.get(); - - if (!desc.match(row)) - continue; - - next = desc.toRow(ectx, row, factory); - - break; - } else { - cur = null; - - part.release(); - part = null; - } - } - - return next != null; - } - - /** {@inheritDoc} */ - @Override protected void onClose() { - if (part != null) - part.release(); - - part = null; - - while (!F.isEmpty(parts)) - parts.poll().release(); - - parts = null; - } - - /** */ - public Iterator<Row> init() throws IgniteCheckedException { - if (isClosed()) - return Collections.emptyIterator(); - - GridCacheContext<?, ?> cctx = desc.cacheContext(); - - if (!cctx.gate().enterIfNotStopped()) { - close(); - - throw new CacheStoppedException(cctx.name()); - } - - try { - GridDhtPartitionTopology top = cctx.topology(); - - top.readLock(); - try { - GridDhtTopologyFuture fut = top.topologyVersionFuture(); - AffinityTopologyVersion topVer = ectx.planningContext().topologyVersion(); - - if (!fut.isDone() || fut.topologyVersion().compareTo(topVer) != 0) - throw new ClusterTopologyCheckedException("Failed to execute query. Retry on stable topology."); - - if (cctx.isPartitioned()) - reservePartitioned(top); - else - reserveReplicated(top); - } - finally { - top.readUnlock(); - } - - cacheId = cctx.cacheId(); - - return this; - } - catch (Exception e) { - Commons.closeQuiet(this, e); - - throw e; - } - finally { - cctx.gate().leave(); - } - } - - /** */ - private void reserveReplicated(GridDhtPartitionTopology top) { - List<GridDhtLocalPartition> locParts = top.localPartitions(); - - parts = new ArrayDeque<>(locParts.size()); - - for (GridDhtLocalPartition local : locParts) { - if (!local.reserve()) - throw reservationException(); - else if (local.state() != GridDhtPartitionState.OWNING) { - local.release(); - - throw reservationException(); - } - - parts.offer(local); - } - } - - /** */ - private void reservePartitioned(GridDhtPartitionTopology top) { - AffinityTopologyVersion topVer = ectx.planningContext().topologyVersion(); - int[] partitions = ectx.partitions(); - - assert topVer != null && !F.isEmpty(partitions); - - parts = new ArrayDeque<>(partitions.length); - - for (int p : partitions) { - GridDhtLocalPartition loc = top.localPartition(p, topVer, false); - - if (loc == null || !loc.reserve()) - throw reservationException(); - else if (loc.state() != GridDhtPartitionState.OWNING) { - loc.release(); - - throw reservationException(); - } - - parts.offer(loc); - } - } - - /** */ - private IgniteSQLException reservationException() { - return new IgniteSQLException("Failed to reserve partition for query execution. Retry on stable topology."); - } - } -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java index 5332485..a64e48b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java @@ -71,6 +71,9 @@ public abstract class AbstractJoinNode<Row> extends AbstractNode<Row> { @Override public void request(int rowsCnt) { checkThread(); + if (isClosed()) + return; + assert !F.isEmpty(sources) && sources.size() == 2; assert rowsCnt > 0 && requested == 0; @@ -124,6 +127,9 @@ public abstract class AbstractJoinNode<Row> extends AbstractNode<Row> { private void pushLeft(Row row) { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waitingLeft > 0; @@ -138,6 +144,9 @@ public abstract class AbstractJoinNode<Row> extends AbstractNode<Row> { private void pushRight(Row row) { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waitingRight > 0; @@ -153,6 +162,9 @@ public abstract class AbstractJoinNode<Row> extends AbstractNode<Row> { private void endLeft() { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waitingLeft > 0; @@ -165,6 +177,9 @@ public abstract class AbstractJoinNode<Row> extends AbstractNode<Row> { private void endRight() { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waitingRight > 0; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java index 67a2718..bd1ad5e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java @@ -37,10 +37,10 @@ public abstract class AbstractNode<Row> implements Node<Row> { protected static final int MODIFY_BATCH_SIZE = IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100); /** */ - protected static final int IO_BATCH_SIZE = IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 200); + protected static final int IO_BATCH_SIZE = IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 256); /** */ - protected static final int IO_BATCH_CNT = IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 50); + protected static final int IO_BATCH_CNT = IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 4); /** for debug purpose */ private volatile Thread thread; @@ -58,6 +58,9 @@ public abstract class AbstractNode<Row> implements Node<Row> { /** */ protected List<Node<Row>> sources; + /** */ + protected boolean closed; + /** * @param ctx Execution context. */ @@ -84,13 +87,23 @@ public abstract class AbstractNode<Row> implements Node<Row> { } /** {@inheritDoc} */ - @Override public void cancel() { + @Override public void close() { checkThread(); - context().markCancelled(); + if (closed) + return; + + closed = true; if (!F.isEmpty(sources)) - sources.forEach(Node::cancel); + sources.forEach(U::closeQuiet); + } + + /** + * @return {@code true} if the subtree is canceled. + */ + protected boolean isClosed() { + return closed; } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java index 347f533..f496b56 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java @@ -97,6 +97,9 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode< @Override public void request(int rowsCnt) { checkThread(); + if (isClosed()) + return; + assert !F.isEmpty(sources) && sources.size() == 1; assert rowsCnt > 0 && requested == 0; @@ -114,6 +117,9 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode< @Override public void push(Row row) { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; @@ -135,6 +141,9 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode< @Override public void end() { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java index e7202fd..19c87d3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java @@ -57,6 +57,9 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row @Override public void request(int rowsCnt) { checkThread(); + if (isClosed()) + return; + assert !F.isEmpty(sources) && sources.size() == 1; assert rowsCnt > 0 && requested == 0; @@ -70,6 +73,9 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row @Override public void push(Row row) { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; @@ -90,6 +96,9 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row @Override public void end() { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java index ce82e36..5e7e281 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java @@ -17,27 +17,30 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; -import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.calcite.util.Pair; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * A part of exchange. */ -public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, AutoCloseable { +public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, SingleNode<Row> { /** */ private final ExchangeService exchange; @@ -54,7 +57,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au private final Map<UUID, Buffer> perNodeBuffers; /** */ - private Collection<UUID> nodes; + private volatile Collection<UUID> srcNodeIds; /** */ private Comparator<Row> comp; @@ -92,17 +95,8 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au perNodeBuffers = new HashMap<>(); } - /** - * @return Query ID. - */ - public UUID queryId() { - return context().queryId(); - } - - /** - * @return Exchange ID. - */ - public long exchangeId() { + /** {@inheritDoc} */ + @Override public long exchangeId() { return exchangeId; } @@ -110,48 +104,44 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au * Inits this Inbox. * * @param ctx Execution context. - * @param nodes Source nodes. + * @param srcNodeIds Source node IDs. * @param comp Optional comparator for merge exchange. */ - public void init(ExecutionContext<Row> ctx, Collection<UUID> nodes, Comparator<Row> comp) { + public void init(ExecutionContext<Row> ctx, Collection<UUID> srcNodeIds, @Nullable Comparator<Row> comp) { // It's important to set proper context here because // because the one, that is created on a first message - // recived doesn't have all context variables in place. + // received doesn't have all context variables in place. this.ctx = ctx; - this.nodes = nodes; this.comp = comp; + + // memory barier + this.srcNodeIds = new HashSet<>(srcNodeIds); } /** {@inheritDoc} */ @Override public void request(int rowsCnt) { checkThread(); - assert nodes != null; + if (isClosed()) + return; + + assert srcNodeIds != null; assert rowsCnt > 0 && requested == 0; requested = rowsCnt; - if (buffers == null) { - nodes.forEach(this::getOrCreateBuffer); - buffers = new ArrayList<>(perNodeBuffers.values()); - - assert buffers.size() == nodes.size(); - } - if (!inLoop) context().execute(this::pushInternal); } /** {@inheritDoc} */ - @Override public void cancel() { - checkThread(); - context().markCancelled(); - close(); - } - - /** {@inheritDoc} */ @Override public void close() { + if (isClosed()) + return; + registry.unregister(this); + + super.close(); } /** {@inheritDoc} */ @@ -175,6 +165,9 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> rows) { checkThread(); + if (isClosed()) + return; + Buffer buf = getOrCreateBuffer(src); boolean waitingBefore = buf.check() == State.WAITING; @@ -185,20 +178,47 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au pushInternal(); } + /** + * @param e Error. + */ + private void onError(Throwable e) { + checkThread(); + + assert downstream != null; + + downstream.onError(e); + + close(); + } + /** */ private void pushInternal() { + if (isClosed()) + return; + assert downstream != null; inLoop = true; + try { + if (buffers == null) { + for (UUID node : srcNodeIds) + checkNode(node); + + buffers = srcNodeIds.stream() + .map(this::getOrCreateBuffer) + .collect(Collectors.toList()); + + assert buffers.size() == perNodeBuffers.size(); + } + if (comp != null) pushOrdered(); else pushUnordered(); } - catch (Exception e) { - downstream.onError(e); - close(); + catch (Throwable e) { + onError(e); } finally { inLoop = false; @@ -231,7 +251,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au } while (requested > 0 && !heap.isEmpty()) { - if (context().cancelled()) + if (isClosed()) return; Buffer buf = heap.poll().right; @@ -259,8 +279,6 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au downstream.end(); requested = 0; - - close(); } } @@ -269,7 +287,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au int idx = 0, noProgress = 0; while (requested > 0 && !buffers.isEmpty()) { - if (context().cancelled()) + if (isClosed()) return; Buffer buf = buffers.get(idx); @@ -299,8 +317,6 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au if (requested > 0 && buffers.isEmpty()) { downstream.end(); requested = 0; - - close(); } } @@ -320,6 +336,28 @@ public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Au } /** */ + public void onNodeLeft(UUID nodeId) { + if (ctx.originatingNodeId().equals(nodeId) && srcNodeIds == null) + ctx.execute(this::close); + else if (srcNodeIds != null && srcNodeIds.contains(nodeId)) + ctx.execute(() -> onNodeLeft0(nodeId)); + } + + /** */ + private void onNodeLeft0(UUID nodeId) { + checkThread(); + + if (getOrCreateBuffer(nodeId).check() != State.END) + onError(new ClusterTopologyCheckedException("Node left [nodeId=" + nodeId + ']')); + } + + /** */ + private void checkNode(UUID nodeId) throws ClusterTopologyCheckedException { + if (!exchange.alive(nodeId)) + throw new ClusterTopologyCheckedException("Node left [nodeId=" + nodeId + ']'); + } + + /** */ private static final class Batch<Row> implements Comparable<Batch<Row>> { /** */ private final int batchId; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java similarity index 58% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java index 3444347..0dc7077 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java @@ -15,30 +15,28 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.message; +package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.query.calcite.util.Service; +/** */ +public interface Mailbox<T> extends Node<T> { + /** + * @return Query ID. + */ + default UUID queryId() { + return context().queryId(); + } -/** - * - */ -public interface MessageService extends Service { /** - * Sends a message to given node. - * - * @param nodeId Node ID. - * @param msg Message. + * @return Fragment ID. */ - void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException; + default long fragmentId() { + return context().fragmentId(); + } /** - * Registers a listener for messages of a given type. - * - * @param lsnr Listener. - * @param type Message type. + * @return Exchange ID. */ - void register(MessageListener lsnr, MessageType type); + long exchangeId(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java index d291413..773a173 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java @@ -91,6 +91,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row @Override public void request(int rowsCnt) { checkThread(); + if (isClosed()) + return; + assert !F.isEmpty(sources) && sources.size() == 1; assert rowsCnt > 0 && requested == 0; @@ -104,6 +107,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row @Override public void push(Row row) { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; assert state == State.UPDATING; @@ -134,6 +140,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row @Override public void end() { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java index 33f14ba..6c99f4d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)}, {@link Node#cancel()}, * {@link Downstream#push(Object)} and {@link Downstream#end()} methods should be used from one single thread. */ -public interface Node<Row> { +public interface Node<Row> extends AutoCloseable { /** * Returns runtime context allowing access to the tables in a database. * @@ -53,9 +53,4 @@ public interface Node<Row> { * Requests next bunch of rows. */ void request(int rowsCnt); - - /** - * Cancels execution. - */ - void cancel(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java index b81223f..199f528 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java @@ -38,7 +38,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; /** * A part of exchange. */ -public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row>, AutoCloseable { +public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, SingleNode<Row>, Downstream<Row> { /** */ private final ExchangeService exchange; @@ -61,9 +61,6 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D private final Map<UUID, Buffer> nodeBuffers = new HashMap<>(); /** */ - private boolean cancelled; - - /** */ private int waiting; /** @@ -78,8 +75,8 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D ExecutionContext<Row> ctx, ExchangeService exchange, MailboxRegistry registry, - long exchangeId, long - targetFragmentId, + long exchangeId, + long targetFragmentId, Destination dest ) { super(ctx); @@ -90,17 +87,8 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D this.dest = dest; } - /** - * @return Query ID. - */ - public UUID queryId() { - return context().queryId(); - } - - /** - * @return Exchange ID. - */ - public long exchangeId() { + /** {@inheritDoc} */ + @Override public long exchangeId() { return exchangeId; } @@ -111,7 +99,11 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D * @param batchId Batch ID. */ public void onAcknowledge(UUID nodeId, int batchId) { - nodeBuffers.get(nodeId).onAcknowledge(batchId); + Buffer buffer = nodeBuffers.get(nodeId); + + assert buffer != null; + + buffer.onAcknowledge(batchId); } /** */ @@ -125,6 +117,9 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D @Override public void push(Row row) { checkThread(); + if (isClosed()) + return; + assert waiting > 0; waiting--; @@ -138,6 +133,9 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D @Override public void end() { checkThread(); + if (isClosed()) + return; + assert waiting > 0; waiting = -1; @@ -145,8 +143,6 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D try { for (UUID node : dest.targets()) getOrCreateBuffer(node).end(); - - close(); } catch (Exception e) { onError(e); @@ -158,30 +154,29 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D U.error(context().planningContext().logger(), "Error occurred during execution: " + X.getFullStackTrace(e)); - cancel(); // TODO send cause to originator. - } - - /** {@inheritDoc} */ - @Override public void cancel() { - checkThread(); - - context().markCancelled(); - - if (cancelled) - return; - - cancelled = true; - - nodeBuffers.values().forEach(Buffer::cancel); + try { + sendError(e); + } + catch (IgniteCheckedException ex) { + U.error(context().planningContext().logger(), + "Error occurred during send error message: " + X.getFullStackTrace(e)); + } close(); - - super.cancel(); } /** {@inheritDoc} */ @Override public void close() { + if (isClosed()) + return; + registry.unregister(this); + + // Send cancel message for the Inbox to close Inboxes created by batch message race. + for (UUID node : dest.targets()) + getOrCreateBuffer(node).close(); + + super.close(); } /** {@inheritDoc} */ @@ -208,9 +203,14 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D } /** */ - private void sendCancel(UUID nodeId, int batchId) { + private void sendError(Throwable err) throws IgniteCheckedException { + exchange.sendError(ctx.originatingNodeId(), queryId(), fragmentId(), err); + } + + /** */ + private void sendInboxClose(UUID nodeId) { try { - exchange.cancel(nodeId, queryId(), targetFragmentId, exchangeId, batchId); + exchange.closeInbox(nodeId, queryId(), targetFragmentId, exchangeId); } catch (IgniteCheckedException e) { U.warn(context().planningContext().logger(), "Failed to send cancel message.", e); @@ -264,6 +264,12 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D } /** */ + public void onNodeLeft(UUID nodeId) { + if (nodeId.equals(ctx.originatingNodeId())) + ctx.execute(this::close); + } + + /** */ private final class Buffer { /** */ private final UUID nodeId; @@ -318,15 +324,18 @@ public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, D } /** */ - public void cancel() { + public void close() { + int currBatchId = hwm; + if (hwm == Integer.MAX_VALUE) return; - int batchId = hwm + 1; hwm = Integer.MAX_VALUE; curr = null; - sendCancel(nodeId, batchId); + + if (currBatchId >= 0) + sendInboxClose(nodeId); } /** diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java index 0c2c2ab..afe7fc0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java @@ -43,6 +43,9 @@ public class ProjectNode<Row> extends AbstractNode<Row> implements SingleNode<Ro @Override public void request(int rowsCnt) { checkThread(); + if (isClosed()) + return; + assert !F.isEmpty(sources) && sources.size() == 1; assert rowsCnt > 0; @@ -53,6 +56,9 @@ public class ProjectNode<Row> extends AbstractNode<Row> implements SingleNode<Ro @Override public void push(Row row) { checkThread(); + if (isClosed()) + return; + assert downstream != null; try { @@ -67,6 +73,9 @@ public class ProjectNode<Row> extends AbstractNode<Row> implements SingleNode<Ro @Override public void end() { checkThread(); + if (isClosed()) + return; + assert downstream != null; downstream.end(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java index ca4c370..68da4fd 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java @@ -22,9 +22,9 @@ import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; @@ -35,8 +35,7 @@ import org.apache.ignite.internal.util.typedef.F; /** * Client iterator. */ -public class RootNode<Row> extends AbstractNode<Row> - implements SingleNode<Row>, Downstream<Row>, Iterator<Row>, AutoCloseable { +public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row>, Iterator<Row> { /** */ private final ReentrantLock lock; @@ -47,13 +46,13 @@ public class RootNode<Row> extends AbstractNode<Row> private final Deque<Row> buff; /** */ - private final Consumer<RootNode<Row>> onClose; + private final Runnable onClose; /** */ private volatile State state = State.RUNNING; /** */ - private volatile IgniteSQLException ex; + private final AtomicReference<Throwable> ex = new AtomicReference<>(); /** */ private Row row; @@ -64,15 +63,27 @@ public class RootNode<Row> extends AbstractNode<Row> /** * @param ctx Execution context. */ - public RootNode(ExecutionContext<Row> ctx, Consumer<RootNode<Row>> onClose) { + public RootNode(ExecutionContext<Row> ctx) { super(ctx); - this.onClose = onClose; + buff = new ArrayDeque<>(IN_BUFFER_SIZE); + lock = new ReentrantLock(); + cond = lock.newCondition(); - // extra space for possible END marker - buff = new ArrayDeque<>(IN_BUFFER_SIZE + 1); + onClose = this::proceedClose; + } + + /** + * @param ctx Execution context. + */ + public RootNode(ExecutionContext<Row> ctx, Runnable onClose) { + super(ctx); + + buff = new ArrayDeque<>(IN_BUFFER_SIZE); lock = new ReentrantLock(); cond = lock.newCondition(); + + this.onClose = onClose; } /** */ @@ -80,39 +91,38 @@ public class RootNode<Row> extends AbstractNode<Row> return context().queryId(); } - /** {@inheritDoc} */ - @Override public void cancel() { - if (state != State.RUNNING) - return; + /** */ + public void proceedClose() { + context().execute(() -> { + checkThread(); + if (isClosed()) + return; + + buff.clear(); + + super.close(); + }); + } + + /** {@inheritDoc} */ + @Override public void close() { lock.lock(); try { - if (state != State.RUNNING) + if (state == State.RUNNING) + state = State.CANCELLED; + else if (state == State.END) + state = State.CLOSED; + else return; - context().markCancelled(); - state = State.CANCELLED; - buff.clear(); cond.signalAll(); } finally { lock.unlock(); } - - context().execute(F.first(sources)::cancel); - onClose.accept(this); - } - /** - * @return Execution state. - */ - public State state() { - return state; - } - - /** {@inheritDoc} */ - @Override public void close() { - cancel(); + onClose.run(); } /** {@inheritDoc} */ @@ -123,7 +133,7 @@ public class RootNode<Row> extends AbstractNode<Row> lock.lock(); try { - assert waiting > 0; + assert waiting > 0 : "waiting=" + waiting; waiting--; @@ -149,7 +159,7 @@ public class RootNode<Row> extends AbstractNode<Row> @Override public void end() { lock.lock(); try { - assert waiting > 0; + assert waiting > 0 : "waiting=" + waiting; waiting = -1; @@ -165,18 +175,17 @@ public class RootNode<Row> extends AbstractNode<Row> /** {@inheritDoc} */ @Override public void onError(Throwable e) { - checkThread(); + if (!ex.compareAndSet(null, e)) + ex.get().addSuppressed(e); - ex = new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e); - - cancel(); + close(); } /** {@inheritDoc} */ @Override public boolean hasNext() { if (row != null) return true; - else if (state == State.END) + else if (state == State.END || state == State.CLOSED) return false; else return (row = take()) != null; @@ -217,10 +226,10 @@ public class RootNode<Row> extends AbstractNode<Row> lock.lock(); try { - checkCancelled(); - assert state == State.RUNNING; - while (true) { + checkCancelled(); + assert state == State.RUNNING; + if (!buff.isEmpty()) return buff.poll(); else if (waiting == -1) @@ -231,9 +240,6 @@ public class RootNode<Row> extends AbstractNode<Row> } cond.await(); - - checkCancelled(); - assert state == State.RUNNING; } state = State.END; @@ -247,7 +253,7 @@ public class RootNode<Row> extends AbstractNode<Row> assert state == State.END; - onClose.accept(this); + close(); return null; } @@ -255,22 +261,21 @@ public class RootNode<Row> extends AbstractNode<Row> /** */ private void checkCancelled() { if (state == State.CANCELLED) { - if (ex != null) - throw ex; + ex.compareAndSet(null, new IgniteSQLException("The query was cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED)); - throw new IgniteSQLException("The query was cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED); + throw sqlException(ex.get()); } } /** */ - public enum State { - /** */ - RUNNING, - - /** */ - CANCELLED, + private IgniteSQLException sqlException(Throwable e) { + return e instanceof IgniteSQLException + ? (IgniteSQLException)e + : new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e); + } - /** */ - END + /** */ + private enum State { + RUNNING, CANCELLED, END, CLOSED } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java index 72df1e4..fae9530 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons; /** * Scan node. */ -public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, AutoCloseable { +public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row> { /** */ private final Iterable<Row> src; @@ -54,6 +54,9 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, @Override public void request(int rowsCnt) { checkThread(); + if (isClosed()) + return; + assert rowsCnt > 0 && requested == 0; requested = rowsCnt; @@ -63,15 +66,17 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } /** {@inheritDoc} */ - @Override public void cancel() { - checkThread(); - context().markCancelled(); - close(); - } - - /** {@inheritDoc} */ @Override public void close() { + if (isClosed()) + return; + Commons.closeQuiet(it); + + it = null; + + Commons.closeQuiet(src); + + super.close(); } /** {@inheritDoc} */ @@ -86,6 +91,9 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, /** */ private void pushInternal() { + if (isClosed()) + return; + inLoop = true; try { if (it == null) @@ -96,7 +104,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Thread thread = Thread.currentThread(); while (requested > 0 && it.hasNext()) { - if (context().cancelled()) + if (isClosed()) return; if (thread.isInterrupted()) @@ -117,16 +125,27 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, downstream.end(); requested = 0; - close(); + Commons.closeQuiet(it); + + it = null; } } catch (Throwable e) { - close(); - - downstream.onError(e); + onError(e); } finally { inLoop = false; } } + + /** */ + private void onError(Throwable e) { + checkThread(); + + assert downstream != null; + + downstream.onError(e); + + close(); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java index 79943b3..64c236d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java @@ -60,23 +60,28 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, @Override public void request(int rowsCnt) { checkThread(); + if (isClosed()) + return; + assert !F.isEmpty(sources) && sources.size() == 1; assert rowsCnt > 0 && requested == 0; + assert waiting <= 0; requested = rowsCnt; - if (waiting == -1 && !inLoop) - context().execute(this::flushFromBuffer); - else if (waiting == 0) + if (waiting == 0) F.first(sources).request(waiting = IN_BUFFER_SIZE); - else - throw new AssertionError(); + else if (!inLoop) + context().execute(this::flushFromBuffer); } /** {@inheritDoc} */ @Override public void push(Row row) { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; @@ -97,6 +102,9 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, @Override public void end() { checkThread(); + if (isClosed()) + return; + assert downstream != null; assert waiting > 0; @@ -123,28 +131,16 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, private void flushFromBuffer() { assert waiting == -1; - inLoop = true; + int processed = 0; + inLoop = true; try { - int processed = 0; + while (requested > 0 && !rows.isEmpty()) { + requested--; - while (requested > 0) { - int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed); + downstream.push(rows.poll()); - for (int i = 0; i < toSnd; i++) { - requested--; - - if (rows.isEmpty()) - break; - - Row row = rows.poll(); - - downstream.push(row); - - processed++; - } - - if (processed >= IN_BUFFER_SIZE && requested > 0) { + if (++processed >= IN_BUFFER_SIZE && requested > 0) { // allow others to do their job context().execute(this::flushFromBuffer); @@ -156,7 +152,6 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, downstream.end(); requested = 0; } - } finally { inLoop = false; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java similarity index 70% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java index 933d656..de45c18 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java @@ -20,13 +20,16 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.nio.ByteBuffer; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class InboxCancelMessage implements ExecutionContextAware { +public class ErrorMessage implements MarshalableMessage { /** */ private UUID queryId; @@ -34,44 +37,47 @@ public class InboxCancelMessage implements ExecutionContextAware { private long fragmentId; /** */ - private long exchangeId; + private byte[] errBytes; /** */ - private int batchId; + @GridDirectTransient + private Throwable err; /** */ - public InboxCancelMessage(){} + public ErrorMessage() { + // No-op. + } /** */ - public InboxCancelMessage(UUID queryId, long fragmentId, long exchangeId, int batchId) { + public ErrorMessage(UUID queryId, long fragmentId, Throwable err) { + assert err != null; + this.queryId = queryId; this.fragmentId = fragmentId; - this.exchangeId = exchangeId; - this.batchId = batchId; + this.err = err; } - /** {@inheritDoc} */ - @Override public UUID queryId() { + /** + * @return Query ID. + */ + public UUID queryId() { return queryId; } - /** {@inheritDoc} */ - @Override public long fragmentId() { - return fragmentId; - } - /** - * @return Exchange ID. + * @return Fragment ID. */ - public long exchangeId() { - return exchangeId; + public long fragmentId() { + return fragmentId; } /** - * @return Batch ID. + * @return Marshaled Throwable. */ - public int batchId() { - return batchId; + public Throwable error() { + assert err != null; + + return err; } /** {@inheritDoc} */ @@ -87,24 +93,18 @@ public class InboxCancelMessage implements ExecutionContextAware { switch (writer.state()) { case 0: - if (!writer.writeInt("batchId", batchId)) + if (!writer.writeByteArray("errBytes", errBytes)) return false; writer.incrementState(); case 1: - if (!writer.writeLong("exchangeId", exchangeId)) - return false; - - writer.incrementState(); - - case 2: if (!writer.writeLong("fragmentId", fragmentId)) return false; writer.incrementState(); - case 3: + case 2: if (!writer.writeUuid("queryId", queryId)) return false; @@ -123,8 +123,8 @@ public class InboxCancelMessage implements ExecutionContextAware { return false; switch (reader.state()) { - case 0: - batchId = reader.readInt("batchId"); + case 0: + errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; @@ -132,14 +132,6 @@ public class InboxCancelMessage implements ExecutionContextAware { reader.incrementState(); case 1: - exchangeId = reader.readLong("exchangeId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: fragmentId = reader.readLong("fragmentId"); if (!reader.isLastRead()) @@ -147,7 +139,7 @@ public class InboxCancelMessage implements ExecutionContextAware { reader.incrementState(); - case 3: + case 2: queryId = reader.readUuid("queryId"); if (!reader.isLastRead()) @@ -157,16 +149,26 @@ public class InboxCancelMessage implements ExecutionContextAware { } - return reader.afterMessageRead(InboxCancelMessage.class); + return reader.afterMessageRead(ErrorMessage.class); } /** {@inheritDoc} */ @Override public MessageType type() { - return MessageType.QUERY_INBOX_CANCEL_MESSAGE; + return MessageType.QUERY_ERROR_MESSAGE; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 3; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException { + errBytes = marshaller.marshal(err); + } + + /** {@inheritDoc} */ + @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws IgniteCheckedException { + err = marshaller.unmarshal(errBytes, loader); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java similarity index 79% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java index 933d656..4398306 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java @@ -26,7 +26,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class InboxCancelMessage implements ExecutionContextAware { +public class InboxCloseMessage implements CalciteMessage { /** */ private UUID queryId; @@ -37,26 +37,28 @@ public class InboxCancelMessage implements ExecutionContextAware { private long exchangeId; /** */ - private int batchId; - - /** */ - public InboxCancelMessage(){} + public InboxCloseMessage() { + // No-op. + } /** */ - public InboxCancelMessage(UUID queryId, long fragmentId, long exchangeId, int batchId) { + public InboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) { this.queryId = queryId; this.fragmentId = fragmentId; this.exchangeId = exchangeId; - this.batchId = batchId; } - /** {@inheritDoc} */ - @Override public UUID queryId() { + /** + * @return Query ID. + */ + public UUID queryId() { return queryId; } - /** {@inheritDoc} */ - @Override public long fragmentId() { + /** + * @return Fragment ID. + */ + public long fragmentId() { return fragmentId; } @@ -67,13 +69,6 @@ public class InboxCancelMessage implements ExecutionContextAware { return exchangeId; } - /** - * @return Batch ID. - */ - public int batchId() { - return batchId; - } - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -87,24 +82,18 @@ public class InboxCancelMessage implements ExecutionContextAware { switch (writer.state()) { case 0: - if (!writer.writeInt("batchId", batchId)) - return false; - - writer.incrementState(); - - case 1: if (!writer.writeLong("exchangeId", exchangeId)) return false; writer.incrementState(); - case 2: + case 1: if (!writer.writeLong("fragmentId", fragmentId)) return false; writer.incrementState(); - case 3: + case 2: if (!writer.writeUuid("queryId", queryId)) return false; @@ -124,14 +113,6 @@ public class InboxCancelMessage implements ExecutionContextAware { switch (reader.state()) { case 0: - batchId = reader.readInt("batchId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: exchangeId = reader.readLong("exchangeId"); if (!reader.isLastRead()) @@ -139,7 +120,7 @@ public class InboxCancelMessage implements ExecutionContextAware { reader.incrementState(); - case 2: + case 1: fragmentId = reader.readLong("fragmentId"); if (!reader.isLastRead()) @@ -147,7 +128,7 @@ public class InboxCancelMessage implements ExecutionContextAware { reader.incrementState(); - case 3: + case 2: queryId = reader.readUuid("queryId"); if (!reader.isLastRead()) @@ -157,7 +138,7 @@ public class InboxCancelMessage implements ExecutionContextAware { } - return reader.afterMessageRead(InboxCancelMessage.class); + return reader.afterMessageRead(InboxCloseMessage.class); } /** {@inheritDoc} */ @@ -167,6 +148,6 @@ public class InboxCancelMessage implements ExecutionContextAware { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 3; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java index 3444347..1376c00 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java @@ -21,6 +21,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.util.Service; +import org.apache.ignite.marshaller.Marshaller; /** * @@ -35,10 +36,28 @@ public interface MessageService extends Service { void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException; /** + * Checks whether a node with given ID is alive. + * + * @param nodeId Node ID. + * @return {@code True} if node is alive. + */ + boolean alive(UUID nodeId); + + /** * Registers a listener for messages of a given type. * * @param lsnr Listener. * @param type Message type. */ void register(MessageListener lsnr, MessageType type); + + /** + * @return Message marshaller. + */ + Marshaller marshaller(); + + /** + * @return Message class loader. + */ + ClassLoader classLoader(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java index e8c3caa..1061c6e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java @@ -27,6 +27,7 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; @@ -105,10 +106,8 @@ public class MessageServiceImpl extends AbstractService implements MessageServic this.classLoader = classLoader; } - /** - * @return Class loader. - */ - public ClassLoader classLoader() { + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { return classLoader; } @@ -133,10 +132,8 @@ public class MessageServiceImpl extends AbstractService implements MessageServic this.marsh = marsh; } - /** - * @return Marshaller. - */ - public Marshaller marshaller() { + /** {@inheritDoc} */ + @Override public Marshaller marshaller() { return marsh; } @@ -211,6 +208,16 @@ public class MessageServiceImpl extends AbstractService implements MessageServic assert old == null : old; } + /** {@inheritDoc} */ + @Override public boolean alive(UUID nodeId) { + try { + return !ioManager().checkNodeLeft(nodeId, null, false); + } + catch (IgniteClientDisconnectedCheckedException e) { + throw new AssertionError(e); + } + } + /** */ protected void prepareMarshal(Message msg) throws IgniteCheckedException { try { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java index 9c1446c..6979b06 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java @@ -28,11 +28,13 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescr public enum MessageType { QUERY_START_REQUEST(300, QueryStartRequest::new), QUERY_START_RESPONSE(301, QueryStartResponse::new), - QUERY_CANCEL_REQUEST(302, QueryCancelRequest::new), + QUERY_ERROR_MESSAGE(302, ErrorMessage::new), QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new), QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new), - QUERY_INBOX_CANCEL_MESSAGE(305, InboxCancelMessage::new), - GENERIC_ROW_MESSAGE(306, GenericRowMessage::new), + QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new), + QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new), + GENERIC_ROW_MESSAGE(307, GenericRowMessage::new), + NODES_MAPPING(350, NodesMapping::new), FRAGMENT_DESCRIPTION(351, FragmentDescription::new); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java similarity index 62% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java index 0a3766e..ba6021c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java @@ -26,16 +26,26 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class QueryCancelRequest implements CalciteMessage { +public class OutboxCloseMessage implements CalciteMessage { /** */ private UUID queryId; /** */ - QueryCancelRequest(){} + private long fragmentId; /** */ - public QueryCancelRequest(UUID queryId) { + private long exchangeId; + + /** */ + public OutboxCloseMessage() { + // No-op. + } + + /** */ + public OutboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) { this.queryId = queryId; + this.fragmentId = fragmentId; + this.exchangeId = exchangeId; } /** @@ -45,6 +55,20 @@ public class QueryCancelRequest implements CalciteMessage { return queryId; } + /** + * @return Fragment ID. + */ + public long fragmentId() { + return fragmentId; + } + + /** + * @return Exchange ID. + */ + public long exchangeId() { + return exchangeId; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -58,6 +82,18 @@ public class QueryCancelRequest implements CalciteMessage { switch (writer.state()) { case 0: + if (!writer.writeLong("exchangeId", exchangeId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("fragmentId", fragmentId)) + return false; + + writer.incrementState(); + + case 2: if (!writer.writeUuid("queryId", queryId)) return false; @@ -77,6 +113,22 @@ public class QueryCancelRequest implements CalciteMessage { switch (reader.state()) { case 0: + exchangeId = reader.readLong("exchangeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + fragmentId = reader.readLong("fragmentId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: queryId = reader.readUuid("queryId"); if (!reader.isLastRead()) @@ -86,16 +138,16 @@ public class QueryCancelRequest implements CalciteMessage { } - return reader.afterMessageRead(QueryCancelRequest.class); + return reader.afterMessageRead(OutboxCloseMessage.class); } /** {@inheritDoc} */ @Override public MessageType type() { - return MessageType.QUERY_CANCEL_REQUEST; + return MessageType.QUERY_OUTBOX_CANCEL_MESSAGE; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 1; + return 3; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java index 2d3c6b2..c5aacc8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java @@ -31,7 +31,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class QueryStartRequest implements MarshalableMessage { +public class QueryStartRequest implements MarshalableMessage, ExecutionContextAware { /** */ private String schema; @@ -76,13 +76,16 @@ public class QueryStartRequest implements MarshalableMessage { return schema; } - /** - * @return Query ID. - */ - public UUID queryId() { + /** {@inheritDoc} */ + @Override public UUID queryId() { return qryId; } + /** {@inheritDoc} */ + @Override public long fragmentId() { + return fragmentDesc.fragmentId(); + } + /** * @return Fragment description. */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java similarity index 50% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java index 3444347..9327034 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java @@ -15,30 +15,54 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.message; +package org.apache.ignite.internal.processors.query.calcite.metadata; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.query.calcite.util.Service; - /** * */ -public interface MessageService extends Service { +public class RemoteException extends RuntimeException { + /** */ + private final UUID nodeId; + + /** */ + private final UUID queryId; + + /** */ + private final long fragmentId; + /** - * Sends a message to given node. - * + * @param cause Cause. * @param nodeId Node ID. - * @param msg Message. + * @param queryId Query ID. + * @param fragmentId Fragment ID. + */ + public RemoteException(UUID nodeId, UUID queryId, long fragmentId, Throwable cause) { + super("Remote query execution", cause); + this.nodeId = nodeId; + this.queryId = queryId; + this.fragmentId = fragmentId; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Query ID. */ - void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException; + public UUID queryId() { + return queryId; + } /** - * Registers a listener for messages of a given type. - * - * @param lsnr Listener. - * @param type Message type. + * @return Fragment ID. */ - void register(MessageListener lsnr, MessageType type); + public long fragmentId() { + return fragmentId; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java index 28ce580..551a816 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java @@ -207,6 +207,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> { this.root = root; } + /** */ Fragment build() { return new Fragment(id, root, remotes.build()); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index a53b357..214dcc9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -216,20 +216,6 @@ public final class Commons { U.closeQuiet((AutoCloseable) o); } - /** - * @param o Object to close. - * @param e Exception, what causes close. - */ - public static void closeQuiet(Object o, @Nullable Exception e) { - if (!(o instanceof AutoCloseable)) - return; - - if (e != null) - U.closeWithSuppressingException((AutoCloseable) o, e); - else - U.closeQuiet((AutoCloseable) o); - } - /** */ public static RelDataType combinedRowType(IgniteTypeFactory typeFactory, RelDataType... types) { RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java new file mode 100644 index 0000000..fd83b3b --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.calcite; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.QueryEngine; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static java.util.Collections.singletonList; + +/** + * Cancel query test. + */ +public class CancelTest extends GridCommonAbstractTest { + /** Partition release timeout. */ + private static final long PART_RELEASE_TIMEOUT = 5000L; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(2); + + IgniteCache<Integer, String> c = grid(0).cache("TEST"); + + fillCache(c, 5000); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + QueryEntity ePart = new QueryEntity() + .setTableName("TEST") + .setKeyType(Integer.class.getName()) + .setValueType(String.class.getName()) + .setKeyFieldName("id") + .setValueFieldName("val") + .addQueryField("id", Integer.class.getName(), null) + .addQueryField("val", String.class.getName(), null);; + + return super.getConfiguration(igniteInstanceName) + .setCacheConfiguration( + new CacheConfiguration<>(ePart.getTableName()) + .setAffinity(new RendezvousAffinityFunction(false, 8)) + .setCacheMode(CacheMode.PARTITIONED) + .setQueryEntities(singletonList(ePart)) + .setSqlSchema("PUBLIC")); + } + + /** + * + */ + @Test + public void testCancel() throws Exception { + QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class); + + List<FieldsQueryCursor<List<?>>> cursors = + engine.query(null, "PUBLIC", + "SELECT * FROM TEST", + X.EMPTY_OBJECT_ARRAY); + + Iterator<List<?>> it = cursors.get(0).iterator(); + + it.next(); + + cursors.forEach(QueryCursor::close); + + GridTestUtils.assertThrows(log, () -> { + it.next(); + + return null; + }, + IgniteSQLException.class, "The query was cancelled while executing" + ); + + awaitReservationsRelease("TEST"); + } + + /** + * + */ + @Test + public void testNotOriginatorNodeStop() throws Exception { + QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class); + + List<FieldsQueryCursor<List<?>>> cursors = + engine.query(null, "PUBLIC", + "SELECT * FROM TEST", + X.EMPTY_OBJECT_ARRAY); + + Iterator<List<?>> it = cursors.get(0).iterator(); + + it.next(); + + stopGrid(1); + + GridTestUtils.assertThrowsAnyCause(log, () -> { + while (it.hasNext()) + it.next(); + + return null; + }, + ClusterTopologyCheckedException.class, "Node left" + ); + + awaitReservationsRelease(grid(0), "TEST"); + } + + /** + * + */ + @Test + public void testOriginatorNodeStop() throws Exception { + QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class); + + List<FieldsQueryCursor<List<?>>> cursors = + engine.query(null, "PUBLIC", + "SELECT * FROM TEST", + X.EMPTY_OBJECT_ARRAY); + + Iterator<List<?>> it = cursors.get(0).iterator(); + + it.next(); + + stopGrid(0); + + awaitReservationsRelease(grid(1), "TEST"); + } + + /** + * + */ + @Test + public void testReadToEnd() throws Exception { + QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class); + + List<FieldsQueryCursor<List<?>>> cursors = + engine.query(null, "PUBLIC", + "SELECT * FROM TEST WHERE ID < 1", + X.EMPTY_OBJECT_ARRAY); + + Iterator<List<?>> it = cursors.get(0).iterator(); + + it.next(); + + GridTestUtils.assertThrows(log, () -> { + it.next(); + + return null; + }, + NoSuchElementException.class, null + ); + + GridTestUtils.assertThrows(log, () -> { + it.next(); + + return null; + }, + NoSuchElementException.class, null + ); + + // Checks that all partition are unreserved. + awaitReservationsRelease("TEST"); + } + + /** + * + */ + @Test + public void testFullReadToEnd() throws Exception { + QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class); + + List<FieldsQueryCursor<List<?>>> cursors = + engine.query(null, "PUBLIC", + "SELECT * FROM TEST WHERE ID < 1", + X.EMPTY_OBJECT_ARRAY); + + cursors.get(0).getAll(); + + // Checks that all partition are unreserved. + awaitReservationsRelease("TEST"); + } + + /** + * @param c Cache. + * @param rows Rows count. + */ + private void fillCache(IgniteCache c, int rows) throws InterruptedException { + c.clear(); + + for (int i = 0; i < rows; ++i) + c.put(i, "val_" + i); + + awaitPartitionMapExchange(); + } + + /** + * + */ + private void startNewNode() throws Exception { + startGrid(2); + + awaitPartitionMapExchange(); + } + + /** + * @param cacheName Cache to check + * @throws IgniteInterruptedCheckedException + */ + void awaitReservationsRelease(String cacheName) throws IgniteInterruptedCheckedException { + for (Ignite ign : G.allGrids()) + awaitReservationsRelease((IgniteEx)ign, "TEST"); + } + + /** + * @param node Node to check reservation. + * @param cacheName Cache to check reservations. + */ + void awaitReservationsRelease(IgniteEx node, String cacheName) throws IgniteInterruptedCheckedException { + GridDhtAtomicCache c = GridTestUtils.getFieldValue(node.cachex(cacheName), "delegate"); + + List<GridDhtLocalPartition> parts = c.topology().localPartitions(); + + GridTestUtils.waitForCondition(() -> { + for (GridDhtLocalPartition p : parts) { + if (p.reservations() > 0) + return false; + } + + return true; + }, PART_RELEASE_TIMEOUT); + + for (GridDhtLocalPartition p : parts) + assertEquals("Partition is reserved: " + p, 0, p.reservations()); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java index 7968f0a..aed1f7c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java @@ -1323,7 +1323,7 @@ public class PlannerTest extends GridCommonAbstractTest { exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0, mailboxRegistry, exchangeSvc, new TestFailureProcessor(kernal)).go(fragment.root()); - RootNode<Object[]> consumer = new RootNode<>(ectx, r -> {}); + RootNode<Object[]> consumer = new RootNode<>(ectx); consumer.register(exec); //// Remote part @@ -1568,7 +1568,7 @@ public class PlannerTest extends GridCommonAbstractTest { exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0, mailboxRegistry, exchangeSvc, new TestFailureProcessor(kernal)).go(fragment.root()); - RootNode<Object[]> consumer = new RootNode<>(ectx, r -> {}); + RootNode<Object[]> consumer = new RootNode<>(ectx); consumer.register(exec); //// Remote part @@ -2780,6 +2780,11 @@ public class PlannerTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public boolean alive(UUID nodeId) { + return true; + } + + /** {@inheritDoc} */ @Override protected void prepareMarshal(Message msg) { // No-op; } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index c3c7f9a..c2c625c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -180,6 +180,11 @@ public class AbstractExecutionTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public boolean alive(UUID nodeId) { + return true; + } + + /** {@inheritDoc} */ @Override protected void prepareMarshal(Message msg) { // No-op; } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java index cf4f8c5..ae07e61 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java @@ -142,7 +142,7 @@ public class ContinuousExecutionTest extends AbstractExecutionTest { inbox.init(ectx, nodes.subList(1, nodes.size()), null); - RootNode<Object[]> node = new RootNode<>(ectx, r -> {}); + RootNode<Object[]> node = new RootNode<>(ectx); node.register(inbox); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java index 21bd420..086225a 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java @@ -96,7 +96,7 @@ public class ExecutionTest extends AbstractExecutionTest { FilterNode<Object[]> filter = new FilterNode<>(ctx, r -> (Integer) r[0] >= 2); filter.register(project); - RootNode<Object[]> node = new RootNode<>(ctx, r -> {}); + RootNode<Object[]> node = new RootNode<>(ctx); node.register(filter); assert node.hasNext(); @@ -140,7 +140,7 @@ public class ExecutionTest extends AbstractExecutionTest { UnionAllNode<Object[]> union = new UnionAllNode<>(ctx); union.register(F.asList(scan1, scan2, scan3)); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(union); assertTrue(root.hasNext()); @@ -193,7 +193,7 @@ public class ExecutionTest extends AbstractExecutionTest { ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]}); project.register(join); - RootNode<Object[]> node = new RootNode<>(ctx, r -> {}); + RootNode<Object[]> node = new RootNode<>(ctx); node.register(project); assert node.hasNext(); @@ -252,7 +252,7 @@ public class ExecutionTest extends AbstractExecutionTest { ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[2], r[3], r[1]}); project.register(join); - RootNode<Object[]> node = new RootNode<>(ctx, r -> {}); + RootNode<Object[]> node = new RootNode<>(ctx); node.register(project); assert node.hasNext(); @@ -320,7 +320,7 @@ public class ExecutionTest extends AbstractExecutionTest { ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]}); project.register(join); - RootNode<Object[]> node = new RootNode<>(ctx, r -> {}); + RootNode<Object[]> node = new RootNode<>(ctx); node.register(project); assert node.hasNext(); @@ -371,7 +371,7 @@ public class ExecutionTest extends AbstractExecutionTest { ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[1]}); project.register(join); - RootNode<Object[]> node = new RootNode<>(ctx, r -> {}); + RootNode<Object[]> node = new RootNode<>(ctx); node.register(project); assert node.hasNext(); @@ -419,7 +419,7 @@ public class ExecutionTest extends AbstractExecutionTest { ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[1]}); project.register(join); - RootNode<Object[]> node = new RootNode<>(ctx, r -> {}); + RootNode<Object[]> node = new RootNode<>(ctx); node.register(project); assert node.hasNext(); @@ -471,7 +471,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory()); reduce.register(map); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(reduce); assertTrue(root.hasNext()); @@ -517,7 +517,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory()); reduce.register(map); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(reduce); assertTrue(root.hasNext()); @@ -562,7 +562,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory()); reduce.register(map); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(reduce); assertTrue(root.hasNext()); @@ -607,7 +607,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory()); reduce.register(map); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(reduce); assertTrue(root.hasNext()); @@ -652,7 +652,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory()); reduce.register(map); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(reduce); assertTrue(root.hasNext()); @@ -694,7 +694,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory()); agg.register(scan); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(agg); assertTrue(root.hasNext()); @@ -738,7 +738,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory()); agg.register(scan); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(agg); assertTrue(root.hasNext()); @@ -780,7 +780,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory()); agg.register(scan); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(agg); assertTrue(root.hasNext()); @@ -822,7 +822,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory()); agg.register(scan); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(agg); assertTrue(root.hasNext()); @@ -864,7 +864,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory()); agg.register(scan); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(agg); assertTrue(root.hasNext()); @@ -907,7 +907,7 @@ public class ExecutionTest extends AbstractExecutionTest { AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory()); agg.register(scan); - RootNode<Object[]> root = new RootNode<>(ctx, c -> {}); + RootNode<Object[]> root = new RootNode<>(ctx); root.register(agg); assertTrue(root.hasNext()); diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java index 26b66c9..912d8dc 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest; +import org.apache.ignite.internal.processors.query.calcite.CancelTest; import org.apache.ignite.internal.processors.query.calcite.PlannerTest; import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest; @@ -40,7 +41,8 @@ import org.junit.runners.Suite; ContinuousExecutionTest.class, CalciteQueryProcessorTest.class, JdbcQueryTest.class, - CalciteBasicSecondaryIndexIntegrationTest.class + CalciteBasicSecondaryIndexIntegrationTest.class, + CancelTest.class }) public class IgniteCalciteTestSuite { }