This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch sql-calcite in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push: new a823f50 IGNITE-13973 Fix query hang if an assertion is thrown during the query execution - Fixes #8709. a823f50 is described below commit a823f507d6e6893a53ac7a01e10943949cfa3384 Author: korlov42 <kor...@gridgain.com> AuthorDate: Mon Feb 8 17:10:59 2021 +0300 IGNITE-13973 Fix query hang if an assertion is thrown during the query execution - Fixes #8709. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../query/calcite/exec/ExchangeServiceImpl.java | 24 ++- .../query/calcite/exec/ExecutionContext.java | 20 ++- .../query/calcite/exec/ExecutionServiceImpl.java | 7 +- .../query/calcite/exec/LogicalRelImplementor.java | 5 +- .../query/calcite/exec/rel/AbstractNode.java | 4 +- .../query/calcite/exec/rel/AggregateNode.java | 68 +++---- .../exec/rel/CorrelatedNestedLoopJoinNode.java | 132 ++++++-------- .../query/calcite/exec/rel/Downstream.java | 6 +- .../query/calcite/exec/rel/FilterNode.java | 60 +++---- .../processors/query/calcite/exec/rel/Inbox.java | 66 +++---- .../query/calcite/exec/rel/IndexSpoolNode.java | 51 ++---- .../query/calcite/exec/rel/LimitNode.java | 37 ++-- .../query/calcite/exec/rel/MergeJoinNode.java | 106 ++++------- .../query/calcite/exec/rel/ModifyNode.java | 67 +++---- .../query/calcite/exec/rel/NestedLoopJoinNode.java | 108 ++++-------- .../processors/query/calcite/exec/rel/Node.java | 3 +- .../processors/query/calcite/exec/rel/Outbox.java | 50 ++---- .../query/calcite/exec/rel/ProjectNode.java | 33 +--- .../query/calcite/exec/rel/RootNode.java | 16 +- .../query/calcite/exec/rel/ScanNode.java | 32 ++-- .../query/calcite/exec/rel/SortNode.java | 65 +++---- .../query/calcite/exec/rel/TableSpoolNode.java | 59 +++---- .../query/calcite/exec/rel/UnionAllNode.java | 43 ++--- .../query/calcite/message/ErrorMessage.java | 3 +- .../CalciteErrorHandlilngIntegrationTest.java | 196 +++++++++++++++++++++ .../calcite/exec/rel/ContinuousExecutionTest.java | 2 +- .../query/calcite/exec/rel/ExecutionTest.java | 85 +++++++++ .../query/calcite/planner/PlannerTest.java | 10 +- .../ignite/testsuites/IgniteCalciteTestSuite.java | 2 + .../processors/query/GridQueryProcessor.java | 2 +- modules/core/src/test/config/log4j-test.xml | 4 + 31 files changed, 696 insertions(+), 670 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java index 6ed92b6..2da1b9d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java @@ -159,7 +159,7 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ if (!F.isEmpty(inboxes)) { for (Inbox<?> inbox : inboxes) { inbox.context().cancel(); - inbox.context().execute(inbox::close); + inbox.context().execute(inbox::close, inbox::onError); } } else if (log.isDebugEnabled()) { @@ -177,7 +177,7 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ if (!F.isEmpty(outboxes)) { for (Outbox<?> outbox : outboxes) { outbox.context().cancel(); - outbox.context().execute(outbox::close); + outbox.context().execute(outbox::close, outbox::onError); } } else if (log.isDebugEnabled()) { @@ -193,8 +193,14 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ protected void onMessage(UUID nodeId, QueryBatchAcknowledgeMessage msg) { Outbox<?> outbox = mailboxRegistry().outbox(msg.queryId(), msg.exchangeId()); - if (outbox != null) - outbox.onAcknowledge(nodeId, msg.batchId()); + if (outbox != null) { + try { + outbox.onAcknowledge(nodeId, msg.batchId()); + } + catch (Throwable t) { + outbox.onError(t); + } + } else if (log.isDebugEnabled()) { log.debug("Stale acknowledge message received: [" + "nodeId=" + nodeId + ", " + @@ -218,8 +224,14 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ inbox = mailboxRegistry().register(newInbox); } - if (inbox != null) - inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), Commons.cast(msg.rows())); + if (inbox != null) { + try { + inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), Commons.cast(msg.rows())); + } + catch (Throwable t) { + inbox.onError(t); + } + } else if (log.isDebugEnabled()) { log.debug("Stale batch message received: [" + "nodeId=" + nodeId + ", " + diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java index 936a782..23309f0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.schema.SchemaPlus; @@ -226,8 +228,22 @@ public class ExecutionContext<Row> implements DataContext { * * @param task Query task. */ - public void execute(Runnable task) { - executor.execute(qryId, fragmentId(), task); + public void execute(RunnableX task, Consumer<Throwable> onError) { + executor.execute(qryId, fragmentId(), () -> { + try { + task.run(); + } + catch (Throwable t) { + onError.accept(t); + } + }); + } + + /** */ + @FunctionalInterface + public interface RunnableX { + /** */ + void run() throws Exception; } /** diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index f77273a..ef5dfb3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; + import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; @@ -718,7 +719,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut plan.target(fragment), plan.remotes(fragment)); - Exception ex = null; + Throwable ex = null; for (UUID nodeId : fragmentDesc.nodeIds()) { if (ex != null) info.onResponse(nodeId, fragment.fragmentId(), ex); @@ -734,7 +735,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut messageService().send(nodeId, req); } - catch (Exception e) { + catch (Throwable e) { info.onResponse(nodeId, fragment.fragmentId(), ex = e); } } @@ -770,7 +771,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut failureProcessor()) .go(plan.root()); } - catch (Exception ex) { + catch (Throwable ex) { U.error(log, "Failed to build execution tree. ", ex); mailboxRegistry.outboxes(qryId, frId, -1) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java index 823c236..e9a1ec3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java @@ -96,6 +96,9 @@ import static org.apache.ignite.internal.processors.query.calcite.util.TypeUtils @SuppressWarnings("TypeMayBeWeakened") public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { /** */ + public static final String CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG = "only INNER join supported by IgniteCorrelatedNestedLoop"; + + /** */ private final ExecutionContext<Row> ctx; /** */ @@ -223,7 +226,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { RelDataType rowType = combinedRowType(ctx.getTypeFactory(), leftType, rightType); Predicate<Row> cond = expressionFactory.predicate(rel.getCondition(), rowType); - assert rel.getJoinType() == JoinRelType.INNER; // TODO LEFT, SEMI, ANTI + assert rel.getJoinType() == JoinRelType.INNER : CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG; Node<Row> node = new CorrelatedNestedLoopJoinNode<>(ctx, outType, cond, rel.getVariablesSet()); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java index 41d2181..409d633 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.Collection; import java.util.Comparator; import java.util.List; + import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException; @@ -178,7 +178,7 @@ public abstract class AbstractNode<Row> implements Node<Row> { } /** */ - protected void checkState() throws IgniteCheckedException { + protected void checkState() throws Exception { if (context().isCancelled()) throw new ExecutionCancelledException(); if (Thread.interrupted()) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java index 7e242dd..70cc79d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java @@ -25,9 +25,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.Supplier; + import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; @@ -98,62 +98,47 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode< } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0 && requested == 0; assert waiting <= 0; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - if (waiting == 0) - source().request(waiting = IN_BUFFER_SIZE); - else if (!inLoop) - context().execute(this::doFlush); - } - catch (Exception e) { - onError(e); - } + if (waiting == 0) + source().request(waiting = IN_BUFFER_SIZE); + else if (!inLoop) + context().execute(this::doFlush, this::onError); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting--; + waiting--; - for (Grouping grouping : groupings) - grouping.add(row); + for (Grouping grouping : groupings) + grouping.add(row); - if (waiting == 0) - source().request(waiting = IN_BUFFER_SIZE); - } - catch (Exception e) { - onError(e); - } + if (waiting == 0) + source().request(waiting = IN_BUFFER_SIZE); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting = -1; + waiting = -1; - flush(); - } - catch (Exception e) { - onError(e); - } + flush(); } /** {@inheritDoc} */ @@ -172,19 +157,14 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode< } /** */ - private void doFlush() { - try { - checkState(); + private void doFlush() throws Exception { + checkState(); - flush(); - } - catch (Exception e) { - onError(e); - } + flush(); } /** */ - private void flush() throws IgniteCheckedException { + private void flush() throws Exception { assert waiting == -1; int processed = 0; @@ -208,7 +188,7 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode< if (processed >= IN_BUFFER_SIZE && requested > 0) { // allow others to do their job - context().execute(this::doFlush); + context().execute(this::doFlush, this::onError); return; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index 741bcc4..72c8de5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -22,9 +22,9 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.Predicate; + import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.util.typedef.F; @@ -96,20 +96,15 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - onRequest(); - } - catch (Exception e) { - onError(e); - } + onRequest(); } /** {@inheritDoc} */ @@ -132,12 +127,12 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { if (idx == 0) return new Downstream<Row>() { /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { pushLeft(row); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { endLeft(); } @@ -149,12 +144,12 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { else if (idx == 1) return new Downstream<Row>() { /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { pushRight(row); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { endRight(); } @@ -168,91 +163,71 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void pushLeft(Row row) { + private void pushLeft(Row row) throws Exception { assert downstream() != null; assert waitingLeft > 0; - try { - checkState(); + checkState(); - waitingLeft--; + waitingLeft--; - if (leftInBuf == null) - leftInBuf = new ArrayList<>(leftInBufferSize); + if (leftInBuf == null) + leftInBuf = new ArrayList<>(leftInBufferSize); - leftInBuf.add(row); + leftInBuf.add(row); - onPushLeft(); - } - catch (Exception e) { - onError(e); - } + onPushLeft(); } /** */ - private void pushRight(Row row) { + private void pushRight(Row row) throws Exception { assert downstream() != null; assert waitingRight > 0; - try { - checkState(); + checkState(); - waitingRight--; + waitingRight--; - if (rightInBuf == null) - rightInBuf = new ArrayList<>(rightInBufferSize); + if (rightInBuf == null) + rightInBuf = new ArrayList<>(rightInBufferSize); - rightInBuf.add(row); + rightInBuf.add(row); - onPushRight(); - } - catch (Exception e) { - onError(e); - } + onPushRight(); } /** */ - private void endLeft() { + private void endLeft() throws Exception { assert downstream() != null; assert waitingLeft > 0; - try { - checkState(); + checkState(); - waitingLeft = -1; + waitingLeft = -1; - if (leftInBuf == null) - leftInBuf = Collections.emptyList(); + if (leftInBuf == null) + leftInBuf = Collections.emptyList(); - onEndLeft(); - } - catch (Exception e) { - onError(e); - } + onEndLeft(); } /** */ - private void endRight() { + private void endRight() throws Exception { assert downstream() != null; assert waitingRight > 0; - try { - checkState(); + checkState(); - waitingRight = -1; + waitingRight = -1; - if (rightInBuf == null) - rightInBuf = Collections.emptyList(); + if (rightInBuf == null) + rightInBuf = Collections.emptyList(); - onEndRight(); - } - catch (Exception e) { - onError(e); - } + onEndRight(); } /** */ - private void onRequest() { + private void onRequest() throws Exception { switch (state) { case IN_LOOP: case FILLING_RIGHT: @@ -265,16 +240,11 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { assert F.isEmpty(rightInBuf); context().execute(() -> { - try { - checkState(); + checkState(); - state = State.FILLING_LEFT; - leftSource().request(waitingLeft = leftInBufferSize); - } - catch (Exception e) { - onError(e); - } - }); + state = State.FILLING_LEFT; + leftSource().request(waitingLeft = leftInBufferSize); + }, this::onError); break; case IDLE: @@ -284,15 +254,10 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize; context().execute(() -> { - try { - checkState(); + checkState(); - join(); - } - catch (Exception e) { - onError(e); - } - }); + join(); + }, this::onError); break; @@ -306,7 +271,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void onPushLeft() { + private void onPushLeft() throws Exception { assert state == State.FILLING_LEFT : "Unexpected state:" + state; assert waitingRight == 0 || waitingRight == -1; assert F.isEmpty(rightInBuf); @@ -325,7 +290,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void onPushRight() throws IgniteCheckedException { + private void onPushRight() throws Exception { assert state == State.FILLING_RIGHT : "Unexpected state:" + state; assert !F.isEmpty(leftInBuf); assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize; @@ -340,7 +305,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void onEndLeft() { + private void onEndLeft() throws Exception { assert state == State.FILLING_LEFT : "Unexpected state:" + state; assert waitingLeft == -1; assert waitingRight == 0 || waitingRight == -1; @@ -367,7 +332,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void onEndRight() throws IgniteCheckedException { + private void onEndRight() throws Exception { assert state == State.FILLING_RIGHT : "Unexpected state:" + state; assert waitingRight == -1; assert !F.isEmpty(leftInBuf); @@ -379,7 +344,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void join() throws IgniteCheckedException { + private void join() throws Exception { assert state == State.IDLE; state = State.IN_LOOP; @@ -466,6 +431,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { return sources().get(1); } + /** */ private void prepareCorrelations() { for (int i = 0; i < correlationIds.size(); i++) { Row row = i < leftInBuf.size() ? leftInBuf.get(i) : F.first(leftInBuf); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java index 50b7d66..bf274c1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Downstream.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; /** * Represents an abstract data consumer. * - * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)}, {@link Node#cancel()}, + * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)}, * {@link Downstream#push(Object)} and {@link Downstream#end()} methods should be used from one single thread. */ public interface Downstream<Row> { @@ -28,12 +28,12 @@ public interface Downstream<Row> { * Pushes a row to consumer. * @param row Data row. */ - void push(Row row); + void push(Row row) throws Exception; /** * Signals that data is over. */ - void end(); + void end() throws Exception; /** */ void onError(Throwable e); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java index cbc0922..b0c0c6d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.ArrayDeque; import java.util.Deque; import java.util.function.Predicate; + import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.util.typedef.F; @@ -55,58 +55,43 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0 && requested == 0; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - if (!inLoop) - context().execute(this::doFilter); - } - catch (Exception e) { - onError(e); - } + if (!inLoop) + context().execute(this::doFilter, this::onError); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting--; + waiting--; - if (pred.test(row)) - inBuf.add(row); + if (pred.test(row)) + inBuf.add(row); - filter(); - } - catch (Exception e) { - onError(e); - } + filter(); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting = -1; + waiting = -1; - filter(); - } - catch (Exception e) { - onError(e); - } + filter(); } /** {@inheritDoc} */ @@ -125,19 +110,14 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row } /** */ - private void doFilter() { - try { - checkState(); + private void doFilter() throws Exception { + checkState(); - filter(); - } - catch (Exception e) { - onError(e); - } + filter(); } /** */ - private void filter() throws IgniteCheckedException { + private void filter() throws Exception { inLoop = true; try { while (requested > 0 && !inBuf.isEmpty()) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java index 7bdc0ac..ba2a146 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java @@ -124,21 +124,16 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert srcNodeIds != null; assert rowsCnt > 0 && requested == 0; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - if (!inLoop) - context().execute(this::doPush); - } - catch (Exception e) { - onError(e); - } + if (!inLoop) + context().execute(this::doPush, this::onError); } /** {@inheritDoc} */ @@ -171,36 +166,26 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl * @param last Last batch flag. * @param rows Rows. */ - public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> rows) { - try { - Buffer buf = getOrCreateBuffer(src); + public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> rows) throws Exception { + Buffer buf = getOrCreateBuffer(src); - boolean waitingBefore = buf.check() == State.WAITING; + boolean waitingBefore = buf.check() == State.WAITING; - buf.offer(batchId, last, rows); + buf.offer(batchId, last, rows); - if (requested > 0 && waitingBefore && buf.check() != State.WAITING) - push(); - } - catch (Exception e) { - onError(e); - } + if (requested > 0 && waitingBefore && buf.check() != State.WAITING) + push(); } /** */ - private void doPush() { - try { - checkState(); + private void doPush() throws Exception { + checkState(); - push(); - } - catch (Exception e) { - onError(e); - } + push(); } /** */ - private void push() throws IgniteCheckedException { + private void push() throws Exception { if (buffers == null) { for (UUID node : srcNodeIds) checkNode(node); @@ -219,7 +204,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl } /** */ - private void pushOrdered() throws IgniteCheckedException { + private void pushOrdered() throws Exception { PriorityQueue<Pair<Row, Buffer>> heap = new PriorityQueue<>(Math.max(buffers.size(), 1), Map.Entry.comparingByKey(comp)); @@ -281,7 +266,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl } /** */ - private void pushUnordered() throws IgniteCheckedException { + private void pushUnordered() throws Exception { int idx = 0, noProgress = 0; inLoop = true; @@ -341,22 +326,17 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl /** */ public void onNodeLeft(UUID nodeId) { if (context().originatingNodeId().equals(nodeId) && srcNodeIds == null) - context().execute(this::close); + context().execute(this::close, this::onError); else if (srcNodeIds != null && srcNodeIds.contains(nodeId)) - context().execute(() -> onNodeLeft0(nodeId)); + context().execute(() -> onNodeLeft0(nodeId), this::onError); } /** */ - private void onNodeLeft0(UUID nodeId) { - try { - checkState(); + private void onNodeLeft0(UUID nodeId) throws Exception { + checkState(); - if (getOrCreateBuffer(nodeId).check() != State.END) - onError(new ClusterTopologyCheckedException("Failed to execute query, node left [nodeId=" + nodeId + ']')); - } - catch (Exception e) { - onError(e); - } + if (getOrCreateBuffer(nodeId).check() != State.END) + throw new ClusterTopologyCheckedException("Failed to execute query, node left [nodeId=" + nodeId + ']'); } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java index d8745f6..51607d3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java @@ -101,62 +101,47 @@ public class IndexSpoolNode<Row> extends AbstractNode<Row> implements SingleNode } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0; - try { - checkState(); + checkState(); - if (!indexReady()) { - requested = rowsCnt; + if (!indexReady()) { + requested = rowsCnt; - requestSource(); - } - else - scan.request(rowsCnt); - } - catch (Exception e) { - onError(e); + requestSource(); } + else + scan.request(rowsCnt); } /** */ - private void requestSource() { + private void requestSource() throws Exception { waiting = IN_BUFFER_SIZE; source().request(IN_BUFFER_SIZE); } /** {@inheritDoc} */ - @Override public void push(Row row) { - try { - checkState(); + @Override public void push(Row row) throws Exception { + checkState(); - idx.push(row); + idx.push(row); - waiting--; + waiting--; - if (waiting == 0) - context().execute(this::requestSource); - } - catch (Exception e) { - onError(e); - } + if (waiting == 0) + context().execute(this::requestSource, this::onError); } /** {@inheritDoc} */ - @Override public void end() { - try { - checkState(); + @Override public void end() throws Exception { + checkState(); - waiting = -1; + waiting = -1; - scan.request(requested); - } - catch (Exception e) { - scan.downstream().onError(e); - } + scan.request(requested); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java index 842a44b..ce85c38 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitNode.java @@ -60,7 +60,7 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row> } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0; @@ -73,18 +73,13 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row> if (offset > 0 && rowsProcessed == 0) rowsCnt = offset + rowsCnt; - try { - checkState(); + checkState(); - source().request(waiting = rowsCnt); - } - catch (Exception e) { - onError(e); - } + source().request(waiting = rowsCnt); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { if (waiting == -1) return; @@ -92,12 +87,7 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row> --waiting; - try { - checkState(); - } - catch (Throwable e) { - onError(e); - } + checkState(); if (rowsProcessed > offset) { if (fetchNode == null || (fetchNode != null && rowsProcessed <= fetch + offset)) @@ -109,20 +99,15 @@ public class LimitNode<Row> extends AbstractNode<Row> implements SingleNode<Row> } /** {@inheritDoc} */ - @Override public void end() { - try { - if (waiting == -1) - return; + @Override public void end() throws Exception { + if (waiting == -1) + return; - assert downstream() != null; + assert downstream() != null; - waiting = -1; + waiting = -1; - downstream().end(); - } - catch (Exception e) { - onError(e); - } + downstream().end(); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java index feaa373..e97968b85 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java @@ -22,9 +22,9 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; import java.util.List; + import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.util.typedef.F; @@ -71,33 +71,23 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - if (!inLoop) - context().execute(this::doJoin); - } - catch (Exception e) { - onError(e); - } + if (!inLoop) + context().execute(this::doJoin, this::onError); } /** */ - private void doJoin() { - try { - checkState(); + private void doJoin() throws Exception { + checkState(); - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** {@inheritDoc} */ @@ -115,12 +105,12 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { if (idx == 0) return new Downstream<Row>() { /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { pushLeft(row); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { endLeft(); } @@ -132,12 +122,12 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { else if (idx == 1) return new Downstream<Row>() { /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { pushRight(row); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { endRight(); } @@ -151,75 +141,55 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void pushLeft(Row row) { + private void pushLeft(Row row) throws Exception { assert downstream() != null; assert waitingLeft > 0; - try { - checkState(); + checkState(); - waitingLeft--; + waitingLeft--; - leftInBuf.add(row); + leftInBuf.add(row); - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** */ - private void pushRight(Row row) { + private void pushRight(Row row) throws Exception { assert downstream() != null; assert waitingRight > 0; - try { - checkState(); + checkState(); - waitingRight--; + waitingRight--; - rightInBuf.add(row); + rightInBuf.add(row); - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** */ - private void endLeft() { + private void endLeft() throws Exception { assert downstream() != null; assert waitingLeft > 0; - try { - checkState(); + checkState(); - waitingLeft = NOT_WAITING; + waitingLeft = NOT_WAITING; - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** */ - private void endRight() { + private void endRight() throws Exception { assert downstream() != null; assert waitingRight > 0; - try { - checkState(); + checkState(); - waitingRight = NOT_WAITING; + waitingRight = NOT_WAITING; - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** */ @@ -233,7 +203,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** */ - protected abstract void join() throws IgniteCheckedException; + protected abstract void join() throws Exception; /** */ @NotNull public static <Row> MergeJoinNode<Row> create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType leftRowType, @@ -310,7 +280,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { inLoop = true; try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) { @@ -466,7 +436,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { inLoop = true; try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) @@ -639,7 +609,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { inLoop = true; try { while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING) @@ -835,7 +805,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { inLoop = true; try { while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING) @@ -1033,7 +1003,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { inLoop = true; try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty())) { @@ -1109,7 +1079,7 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { inLoop = true; try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && !(right == null && rightInBuf.isEmpty() && waitingRight != NOT_WAITING)) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java index 5d253c3..d1aaa38 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java @@ -89,71 +89,56 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0 && requested == 0; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - if (!inLoop) - tryEnd(); - } - catch (Exception e) { - onError(e); - } + if (!inLoop) + tryEnd(); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert downstream() != null; assert waiting > 0; assert state == State.UPDATING; - try { - checkState(); + checkState(); - waiting--; + waiting--; - switch (op) { - case DELETE: - case UPDATE: - case INSERT: - tuples.add(desc.toTuple(context(), row, op, cols)); - - flushTuples(false); + switch (op) { + case DELETE: + case UPDATE: + case INSERT: + tuples.add(desc.toTuple(context(), row, op, cols)); - break; - default: - throw new UnsupportedOperationException(op.name()); - } + flushTuples(false); - if (waiting == 0) - source().request(waiting = MODIFY_BATCH_SIZE); - } - catch (Exception e) { - onError(e); + break; + default: + throw new UnsupportedOperationException(op.name()); } + + if (waiting == 0) + source().request(waiting = MODIFY_BATCH_SIZE); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting = -1; - state = State.UPDATED; + waiting = -1; + state = State.UPDATED; - tryEnd(); - } - catch (Exception e) { - onError(e); - } + tryEnd(); } /** {@inheritDoc} */ @@ -170,7 +155,7 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row } /** */ - private void tryEnd() throws IgniteCheckedException { + private void tryEnd() throws Exception { assert downstream() != null; if (state == State.UPDATING && waiting == 0) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index b26649a..a27e9f2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -23,9 +23,9 @@ import java.util.BitSet; import java.util.Deque; import java.util.List; import java.util.function.Predicate; + import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.util.typedef.F; @@ -72,33 +72,23 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - if (!inLoop) - context().execute(this::doJoin); - } - catch (Exception e) { - onError(e); - } + if (!inLoop) + context().execute(this::doJoin, this::onError); } /** */ - private void doJoin() { - try { - checkState(); + private void doJoin() throws Exception { + checkState(); - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** {@inheritDoc} */ @@ -116,12 +106,12 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { if (idx == 0) return new Downstream<Row>() { /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { pushLeft(row); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { endLeft(); } @@ -133,12 +123,12 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { else if (idx == 1) return new Downstream<Row>() { /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { pushRight(row); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { endRight(); } @@ -152,76 +142,56 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - private void pushLeft(Row row) { + private void pushLeft(Row row) throws Exception { assert downstream() != null; assert waitingLeft > 0; - try { - checkState(); + checkState(); - waitingLeft--; + waitingLeft--; - leftInBuf.add(row); + leftInBuf.add(row); - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** */ - private void pushRight(Row row) { + private void pushRight(Row row) throws Exception { assert downstream() != null; assert waitingRight > 0; - try { - checkState(); + checkState(); - waitingRight--; + waitingRight--; - rightMaterialized.add(row); + rightMaterialized.add(row); - if (waitingRight == 0) - rightSource().request(waitingRight = IN_BUFFER_SIZE); - } - catch (Exception e) { - onError(e); - } + if (waitingRight == 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); } /** */ - private void endLeft() { + private void endLeft() throws Exception { assert downstream() != null; assert waitingLeft > 0; - try { - checkState(); + checkState(); - waitingLeft = NOT_WAITING; + waitingLeft = NOT_WAITING; - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** */ - private void endRight() { + private void endRight() throws Exception { assert downstream() != null; assert waitingRight > 0; - try { - checkState(); + checkState(); - waitingRight = NOT_WAITING; + waitingRight = NOT_WAITING; - join(); - } - catch (Exception e) { - onError(e); - } + join(); } /** */ @@ -235,7 +205,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - protected abstract void join() throws IgniteCheckedException; + protected abstract void join() throws Exception; /** */ @NotNull public static <Row> NestedLoopJoinNode<Row> create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType leftRowType, @@ -299,7 +269,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { inLoop = true; try { @@ -377,7 +347,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { inLoop = true; try { @@ -474,7 +444,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { if (rightNotMatchedIndexes == null) { rightNotMatchedIndexes = new BitSet(rightMaterialized.size()); @@ -603,7 +573,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { if (rightNotMatchedIndexes == null) { rightNotMatchedIndexes = new BitSet(rightMaterialized.size()); @@ -725,7 +695,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { if (left == null) @@ -793,7 +763,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void join() throws IgniteCheckedException { + @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { inLoop = true; try { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java index b9833af..59daa66 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.List; + import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; @@ -71,7 +72,7 @@ public interface Node<Row> extends AutoCloseable { /** * Requests next bunch of rows. */ - void request(int rowsCnt); + void request(int rowsCnt) throws Exception; /** * Rewinds upstream. diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java index 3c44646..7b8c3da 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; + import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService; @@ -100,17 +101,12 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing * @param nodeId Target ID. * @param batchId Batch ID. */ - public void onAcknowledge(UUID nodeId, int batchId) { + public void onAcknowledge(UUID nodeId, int batchId) throws Exception { assert nodeBuffers.containsKey(nodeId); - try { - checkState(); + checkState(); - nodeBuffers.get(nodeId).acknowledge(batchId); - } - catch (Exception e) { - onError(e); - } + nodeBuffers.get(nodeId).acknowledge(batchId); } /** */ @@ -120,8 +116,8 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing flush(); } - catch (Exception e) { - onError(e); + catch (Throwable t) { + onError(t); } } @@ -131,37 +127,27 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert waiting > 0; - try { - checkState(); + checkState(); - waiting--; + waiting--; - inBuf.add(row); + inBuf.add(row); - flush(); - } - catch (Exception e) { - onError(e); - } + flush(); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert waiting > 0; - try { - checkState(); + checkState(); - waiting = -1; + waiting = -1; - flush(); - } - catch (Exception e) { - onError(e); - } + flush(); } /** {@inheritDoc} */ @@ -241,7 +227,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing } /** */ - private void flush() throws IgniteCheckedException { + private void flush() throws Exception { while (!inBuf.isEmpty()) { checkState(); @@ -273,7 +259,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing /** */ public void onNodeLeft(UUID nodeId) { if (nodeId.equals(context().originatingNodeId())) - context().execute(this::close); + context().execute(this::close, this::onError); } /** */ @@ -347,7 +333,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing * * @param id batch ID. */ - private void acknowledge(int id) throws IgniteCheckedException { + private void acknowledge(int id) throws Exception { if (lwm > id) return; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java index 4da1284..6b98efc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java @@ -45,46 +45,31 @@ public class ProjectNode<Row> extends AbstractNode<Row> implements SingleNode<Ro } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0; - try { - checkState(); + checkState(); - source().request(rowsCnt); - } - catch (Exception e) { - onError(e); - } + source().request(rowsCnt); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert downstream() != null; - try { - checkState(); + checkState(); - downstream().push(prj.apply(row)); - } - catch (Throwable e) { - onError(e); - } + downstream().push(prj.apply(row)); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert downstream() != null; - try { - checkState(); + checkState(); - downstream().end(); - } - catch (Exception e) { - onError(e); - } + downstream().end(); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java index cc57d00..a99693b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java @@ -121,11 +121,11 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, /** {@inheritDoc} */ @Override public void closeInternal() { - context().execute(() -> sources().forEach(U::closeQuiet)); + context().execute(() -> sources().forEach(U::closeQuiet), this::onError); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert waiting > 0; lock.lock(); @@ -139,16 +139,13 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, if (inBuff.size() == IN_BUFFER_SIZE) cond.signalAll(); } - catch (Exception e) { - onError(e); - } finally { lock.unlock(); } } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert waiting > 0; lock.lock(); @@ -159,9 +156,6 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, cond.signalAll(); } - catch (Exception e) { - onError(e); - } finally { lock.unlock(); } @@ -240,7 +234,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, close(); else if (inBuff.isEmpty() && waiting == 0) { int req = waiting = IN_BUFFER_SIZE; - context().execute(() -> source().request(req)); + context().execute(() -> source().request(req), this::onError); } if (!outBuff.isEmpty() || waiting == -1) @@ -250,7 +244,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } } catch (InterruptedException e) { - onError(new IgniteInterruptedException(e)); + throw new IgniteInterruptedException(e); } finally { lock.unlock(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java index d83267b..c4805d6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java @@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.Iterator; import java.util.List; + import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.util.Commons; @@ -51,32 +51,22 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row> } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested; - try { - checkState(); + checkState(); - requested = rowsCnt; + requested = rowsCnt; - if (!inLoop) - context().execute(this::doPush); - } - catch (Exception e) { - onError(e); - } + if (!inLoop) + context().execute(this::doPush, this::onError); } /** */ - private void doPush() { - try { - checkState(); + private void doPush() throws Exception { + checkState(); - push(); - } - catch (Exception e) { - onError(e); - } + push(); } /** {@inheritDoc} */ @@ -105,7 +95,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row> } /** */ - private void push() throws IgniteCheckedException { + private void push() throws Exception { inLoop = true; try { if (it == null) @@ -120,7 +110,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row> if (++processed == IN_BUFFER_SIZE && requested > 0) { // allow others to do their job - context().execute(this::doPush); + context().execute(this::doPush, this::onError); return; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java index cb30293..9f7a31f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.Comparator; import java.util.PriorityQueue; + import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.util.typedef.F; @@ -65,75 +65,50 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0 && requested == 0; assert waiting <= 0; - try { - checkState(); - - requested = rowsCnt; + checkState(); - if (waiting == 0) - source().request(waiting = IN_BUFFER_SIZE); - else if (!inLoop) - context().execute(this::doFlush); - } - catch (Exception e) { - onError(e); - } - } + requested = rowsCnt; - /** */ - private void doFlush() { - try { - flush(); - } - catch (Exception e) { - onError(e); - } + if (waiting == 0) + source().request(waiting = IN_BUFFER_SIZE); + else if (!inLoop) + context().execute(this::flush, this::onError); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting--; + waiting--; - rows.add(row); + rows.add(row); - if (waiting == 0) - source().request(waiting = IN_BUFFER_SIZE); - } - catch (Exception e) { - onError(e); - } + if (waiting == 0) + source().request(waiting = IN_BUFFER_SIZE); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting = -1; + waiting = -1; - flush(); - } - catch (Exception e) { - downstream().onError(e); - } + flush(); } /** */ - private void flush() throws IgniteCheckedException { + private void flush() throws Exception { assert waiting == -1; int processed = 0; @@ -149,7 +124,7 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, if (++processed >= IN_BUFFER_SIZE && requested > 0) { // allow others to do their job - context().execute(this::doFlush); + context().execute(this::flush, this::onError); return; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java index d1ad00a..7da2e58 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java @@ -74,27 +74,22 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0; - try { - checkState(); + checkState(); - requested += rowsCnt; + requested += rowsCnt; - if ((waiting == -1 || rowIdx < rows.size()) && !inLoop) - context().execute(this::doPush); - else if (waiting == 0) - source().request(waiting = IN_BUFFER_SIZE); - } - catch (Exception e) { - onError(e); - } + if ((waiting == -1 || rowIdx < rows.size()) && !inLoop) + context().execute(this::doPush, this::onError); + else if (waiting == 0) + source().request(waiting = IN_BUFFER_SIZE); } /** */ - private void doPush() { + private void doPush() throws Exception { if (rowIdx >= rows.size() && waiting == -1 && requested > 0) { downstream().end(); @@ -106,7 +101,7 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode } /** */ - private void pushToDownstream() { + private void pushToDownstream() throws Exception { inLoop = true; downstream().push(rows.get(rowIdx)); @@ -121,43 +116,33 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting--; + waiting--; - rows.add(row); + rows.add(row); - if (waiting == 0) - source().request(waiting = IN_BUFFER_SIZE); + if (waiting == 0) + source().request(waiting = IN_BUFFER_SIZE); - if (requested > 0 && rowIdx < rows.size()) - pushToDownstream(); - } - catch (Exception e) { - onError(e); - } + if (requested > 0 && rowIdx < rows.size()) + pushToDownstream(); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting = -1; + waiting = -1; - if (rowIdx >= rows.size() && requested > 0) - downstream().end(); - } - catch (Exception e) { - downstream().onError(e); - } + if (rowIdx >= rows.size() && requested > 0) + downstream().end(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java index 385577b..c2669e1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java @@ -47,54 +47,39 @@ public class UnionAllNode<Row> extends AbstractNode<Row> implements Downstream<R } /** {@inheritDoc} */ - @Override public void request(int rowsCnt) { + @Override public void request(int rowsCnt) throws Exception { assert !F.isEmpty(sources()); assert rowsCnt > 0 && waiting == 0; - try { - checkState(); + checkState(); - source().request(waiting = rowsCnt); - } - catch (Exception e) { - onError(e); - } + source().request(waiting = rowsCnt); } /** {@inheritDoc} */ - @Override public void push(Row row) { + @Override public void push(Row row) throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - waiting--; + waiting--; - downstream().push(row); - } - catch (Exception e) { - onError(e); - } + downstream().push(row); } /** {@inheritDoc} */ - @Override public void end() { + @Override public void end() throws Exception { assert downstream() != null; assert waiting > 0; - try { - checkState(); + checkState(); - if (++curSrc < sources().size()) - source().request(waiting); - else { - waiting = -1; - downstream().end(); - } - } - catch (Exception e) { - onError(e); + if (++curSrc < sources().size()) + source().request(waiting); + else { + waiting = -1; + downstream().end(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java index 6d09c09..45b4385 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java @@ -167,6 +167,7 @@ public class ErrorMessage implements MarshalableMessage { /** {@inheritDoc} */ @Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException { - err = ctx.unmarshal(errBytes); + if (errBytes != null) + err = ctx.unmarshal(errBytes); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java new file mode 100644 index 0000000..537d1f2 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.metric.IoStatisticsHolder; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; +import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest; +import org.apache.ignite.internal.processors.query.h2.database.H2Tree; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Test; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_ENGINE; +import static org.apache.ignite.internal.processors.query.calcite.exec.LogicalRelImplementor.CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG; + +/** */ +@WithSystemProperty(key = "calcite.debug", value = "false") +@WithSystemProperty(key = IGNITE_EXPERIMENTAL_SQL_ENGINE, value = "true") +public class CalciteErrorHandlilngIntegrationTest extends GridCommonAbstractTest { + /** */ + @After + public void cleanUp() { + stopAllGrids(); + } + + /** + * Test verifies that AssertionError on fragment deserialization phase doesn't lead to execution freezing. + * <ol> + * <li>Start several nodes.</li> + * <li>Replace CommunicationSpi to one that modifies messages (replace join type inside a QueryStartRequest).</li> + * <li>Execute query that requires CNLJ.</li> + * <li>Verify that query failed with proper exception.</li> + * </ol> + */ + @Test + public void assertionOnDeserialization() throws Exception { + Supplier<TcpCommunicationSpi> spiLsnrSupp = () -> new TcpCommunicationSpi() { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof QueryStartRequest) { + QueryStartRequest req = (QueryStartRequest)((GridIoMessage)msg).message(); + + String root = GridTestUtils.getFieldValue(req, "root"); + + GridTestUtils.setFieldValue(req, "root", + root.replace("\"joinType\":\"inner\"", "\"joinType\":\"full\"")); + } + + super.sendMessage(node, msg, ackC); + } + }; + + startGrid(createConfiguration(1, false).setCommunicationSpi(spiLsnrSupp.get())); + startGrid(createConfiguration(2, false).setCommunicationSpi(spiLsnrSupp.get())); + + IgniteEx client = startGrid(createConfiguration(0, true).setCommunicationSpi(spiLsnrSupp.get())); + + sql(client, "create table test (id int primary key, val varchar)"); + + String sql = "select /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter') */ t1.id from test t1, test t2 where t1.id = t2.id"; + + Throwable t = GridTestUtils.assertThrowsWithCause(() -> sql(client, sql), AssertionError.class); + assertEquals(CNLJ_SUPPORTS_ONLY_INNER_ASSERTION_MSG, t.getCause().getMessage()); + } + + /** + * Test verifies that a Exception during index look up doesn't lead to execution freezing. + * <ol> + * <li>Start several nodes.</li> + * <li>Inject tree's action wrapper that throws exception on demand.</li> + * <li>Execute query that do index look up.</li> + * <li>Verify that query failed with proper exception.</li> + * <li>Verify that FailureHandler was triggered.</li> + * </ol> + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void assertionOnTreeLookup() throws Exception { + AtomicBoolean shouldThrow = new AtomicBoolean(); + + BPlusTree.testHndWrapper = (tree, hnd) -> { + if (hnd instanceof BPlusTree.Search) { + PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>)hnd; + + return new PageHandler<Object, BPlusTree.Result>() { + @Override public BPlusTree.Result run( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIO io, + Boolean walPlc, + Object arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + BPlusTree.Result res = + delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, intArg, statHolder); + + if (shouldThrow.get() && tree instanceof H2Tree) + throw new RuntimeException("test exception"); + + return res; + } + }; + } + else + return hnd; + }; + + try { + CountDownLatch latch = new CountDownLatch(1); + + FailureHandler failureHnd = (ignite, failureCtx) -> { + latch.countDown(); + + return false; + }; + + startGrid(createConfiguration(1, false).setFailureHandler(failureHnd)); + startGrid(createConfiguration(2, false).setFailureHandler(failureHnd)); + + IgniteEx client = startGrid(createConfiguration(0, true)); + + sql(client, "create table test (id integer primary key, val varchar)"); + sql(client, "create index test_id_idx on test (id)"); + + awaitPartitionMapExchange(true, true, null); + + shouldThrow.set(true); + + List<String> sqls = F.asList( + "select id from test where id > -10", + "select max(id) from test where id > -10" + ); + + for (String sql : sqls) { + GridTestUtils.assertThrowsWithCause(() -> sql(client, sql), CorruptedTreeException.class); + + assertTrue("Failure handler was not invoked", latch.await(5, TimeUnit.SECONDS)); + } + } + finally { + BPlusTree.testHndWrapper = null; + } + } + + /** */ + private List<List<?>> sql(IgniteEx ignite, String sql, Object... args) { + return ignite.context().query().querySqlFields( + new SqlFieldsQuery(sql).setSchema("PUBLIC").setArgs(args), false).getAll(); + } + + /** */ + private IgniteConfiguration createConfiguration(int id, boolean client) throws Exception { + return getConfiguration(client ? "client" : "server-" + id).setClientMode(client); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java index b9b7516..e93bd56 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java @@ -155,7 +155,7 @@ public class ContinuousExecutionTest extends AbstractExecutionTest { outbox.register(filter); registry.register(outbox); - outbox.context().execute(outbox::init); + outbox.context().execute(outbox::init, outbox::onError); } UUID locNodeId = nodes.get(0); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java index 729b79b..4ba60cb 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java @@ -33,6 +33,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; @@ -1047,6 +1048,40 @@ public class ExecutionTest extends AbstractExecutionTest { } /** + * Test verifies that an AssertionError thrown from an execution node + * proprely handled by a task executor. + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void assertionHandlingTest() { + ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0); + IgniteTypeFactory tf = ctx.getTypeFactory(); + RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class); + + CorruptedNode<Object[]> node = new CorruptedNode<>(); + + RootNode<Object[]> root = new RootNode<>(ctx, rowType); + root.register(node); + + Thread watchDog = new Thread(() -> { + try { + U.sleep(5_000); + } + catch (IgniteInterruptedCheckedException ignored) { + } + + if (!root.isClosed()) + root.close(); + }, "test-watchdog"); + + watchDog.start(); + + GridTestUtils.assertThrowsWithCause(root::hasNext, AssertionError.class); + + watchDog.interrupt(); + } + + /** * */ private Object[] row(Object... fields) { @@ -1082,4 +1117,54 @@ public class ExecutionTest extends AbstractExecutionTest { } }; } + + /** + * Node that always throws assertion error except for {@link #close()} + * and {@link #onRegister(Downstream)} methods. + */ + static class CorruptedNode<T> implements Node<T> { + /** {@inheritDoc} */ + @Override public ExecutionContext<T> context() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public RelDataType rowType() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public Downstream<T> downstream() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public void register(List<Node<T>> sources) { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public List<Node<T>> sources() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public void onRegister(Downstream<T> downstream) { + + } + + /** {@inheritDoc} */ + @Override public void request(int rowsCnt) throws Exception { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public void rewind() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + } + } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java index 8f8a74e..f20deb5 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java @@ -1366,7 +1366,9 @@ public class PlannerTest extends AbstractPlannerTest { assert exec instanceof Outbox; - exec.context().execute(((Outbox<Object[]>) exec)::init); + Outbox<Object[]> outbox = (Outbox<Object[]>) exec; + + exec.context().execute(outbox::init, outbox::onError); ArrayList<Object[]> res = new ArrayList<>(); @@ -1625,7 +1627,9 @@ public class PlannerTest extends AbstractPlannerTest { assert exec instanceof Outbox; - exec.context().execute(((Outbox<Object[]>) exec)::init); + Outbox<Object[]> outbox = (Outbox<Object[]>) exec; + + exec.context().execute(outbox::init, outbox::onError); ArrayList<Object[]> res = new ArrayList<>(); @@ -2650,7 +2654,7 @@ public class PlannerTest extends AbstractPlannerTest { assertNotNull(phys); assertEquals("" + - "IgniteMergeJoin(condition=[AND(=($0, $4), =($3, $1))], joinType=[inner])\n" + + "IgniteMergeJoin(condition=[AND(=($0, $4), =($3, $1))], joinType=[inner], leftCollation=[[0, 1]], rightCollation=[[2, 1]])\n" + " IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n" + " IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n", RelOptUtil.toString(phys)); diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java index 32197f2..c5a9b63 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest; +import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest; import org.apache.ignite.internal.processors.query.calcite.CancelTest; import org.apache.ignite.internal.processors.query.calcite.DateTimeTest; @@ -44,6 +45,7 @@ import org.junit.runners.Suite; ClosableIteratorsHolderTest.class, ContinuousExecutionTest.class, CalciteQueryProcessorTest.class, + CalciteErrorHandlilngIntegrationTest.class, JdbcQueryTest.class, CalciteBasicSecondaryIndexIntegrationTest.class, CancelTest.class, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 0e1f602..49ab287 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -247,7 +247,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** h2 redirection stub. */ public static final Pattern H2_REDIRECTION_RULES = - Pattern.compile("\\s*(create\\s*table|drop\\s*table|alter\\s*table)", CASE_INSENSITIVE); + Pattern.compile("\\s*((create|drop)\\s*(table|index)|alter\\s*table)", CASE_INSENSITIVE); /** @see IgniteSystemProperties#IGNITE_EXPERIMENTAL_SQL_ENGINE */ public static final boolean DFLT_IGNITE_EXPERIMENTAL_SQL_ENGINE = false; diff --git a/modules/core/src/test/config/log4j-test.xml b/modules/core/src/test/config/log4j-test.xml index e82c103..91a8919 100755 --- a/modules/core/src/test/config/log4j-test.xml +++ b/modules/core/src/test/config/log4j-test.xml @@ -133,6 +133,10 @@ --> <!-- Disable all open source debugging. --> +<!-- <category name="org.apache.calcite">--> +<!-- <level value="DEBUG"/>--> +<!-- </category>--> + <category name="org"> <level value="INFO"/> </category>