This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 2f7e98a fix exception handling on root node
2f7e98a is described below
commit 2f7e98ae80a2e6bb4002fe2a2412507e8d67f958
Author: Igor Seliverstov <[email protected]>
AuthorDate: Mon Sep 14 17:13:05 2020 +0300
fix exception handling on root node
---
.../query/calcite/exec/ExecutionServiceImpl.java | 2 +-
.../query/calcite/exec/rel/AbstractNode.java | 36 ++++++++++++----------
.../query/calcite/exec/rel/AggregateNode.java | 2 +-
.../exec/rel/CorrelatedNestedLoopJoinNode.java | 2 +-
.../query/calcite/exec/rel/FilterNode.java | 2 +-
.../processors/query/calcite/exec/rel/Inbox.java | 10 ++----
.../query/calcite/exec/rel/ModifyNode.java | 2 +-
.../query/calcite/exec/rel/NestedLoopJoinNode.java | 26 ++++++++--------
.../processors/query/calcite/exec/rel/Node.java | 2 +-
.../processors/query/calcite/exec/rel/Outbox.java | 8 ++---
.../query/calcite/exec/rel/ProjectNode.java | 2 +-
.../query/calcite/exec/rel/RootNode.java | 8 ++---
.../query/calcite/exec/rel/ScanNode.java | 6 ++--
.../query/calcite/exec/rel/SortNode.java | 2 +-
.../query/calcite/exec/rel/UnionAllNode.java | 2 +-
15 files changed, 56 insertions(+), 56 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 99b0ea5..6a1966e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -1028,7 +1028,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
running.remove(ctx.queryId());
// 2) close local fragment
- root.onClose();
+ root.closeInternal();
// 3) close remote fragments
for (UUID nodeId : remotes) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index c47321f..f021516 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -103,7 +103,7 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
if (isClosed())
return;
- onClose();
+ closeInternal();
if (!F.isEmpty(sources()))
sources().forEach(U::closeQuiet);
@@ -111,7 +111,7 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
/** {@inheritDoc} */
@Override public void rewind() {
- onRewind();
+ rewindInternal();
if (!F.isEmpty(sources()))
sources().forEach(Node::rewind);
@@ -122,36 +122,40 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
this.downstream = downstream;
}
- /** */
- protected abstract void onRewind();
-
/**
* Processes given exception.
*
* @param e Exception.
*/
public void onError(Throwable e) {
- assert downstream() != null;
-
- if (e instanceof ExecutionCancelledException) {
+ if (e instanceof ExecutionCancelledException)
U.warn(context().planningContext().logger(), "Execution is
cancelled.", e);
+ else
+ onErrorInternal(e);
+ }
- return;
- }
+ /** */
+ protected void closeInternal() {
+ closed = true;
+ }
+
+ /** */
+ protected abstract void rewindInternal();
+
+ /** */
+ protected void onErrorInternal(Throwable e) {
+ Downstream<Row> downstream = downstream();
+
+ assert downstream != null;
try {
- downstream().onError(e);
+ downstream.onError(e);
}
finally {
U.closeQuiet(this);
}
}
- /** */
- protected void onClose() {
- closed = true;
- }
-
/**
* @return {@code true} if the subtree is canceled.
*/
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
index 8d300b6..80911e0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
@@ -157,7 +157,7 @@ public class AggregateNode<Row> extends AbstractNode<Row>
implements SingleNode<
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
requested = 0;
waiting = 0;
groupings.forEach(grouping -> grouping.groups.clear());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
index c88a9a8..d102be1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -113,7 +113,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
leftInBuf = null;
rightInBuf = null;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
index ba8f5fe..06703eb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
@@ -118,7 +118,7 @@ public class FilterNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
requested = 0;
waiting = 0;
inBuf.clear();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index 58c084e..3ea53fc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -137,8 +137,8 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
}
/** {@inheritDoc} */
- @Override public void onClose() {
- super.onClose();
+ @Override public void closeInternal() {
+ super.closeInternal();
registry.unregister(this);
}
@@ -154,7 +154,7 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
throw new UnsupportedOperationException();
}
@@ -168,8 +168,6 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
*/
public void onBatchReceived(UUID src, int batchId, boolean last, List<Row>
rows) {
try {
- checkState();
-
Buffer buf = getOrCreateBuffer(src);
boolean waitingBefore = buf.check() == State.WAITING;
@@ -187,8 +185,6 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
/** */
private void doPush() {
try {
- checkState();
-
push();
}
catch (Exception e) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 5336481..a89255f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -157,7 +157,7 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
throw new UnsupportedOperationException();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index c65dc68..9a90d36 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -103,7 +103,7 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
requested = 0;
waitingLeft = 0;
waitingRight = 0;
@@ -292,11 +292,11 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
left = null;
rightIdx = 0;
- super.onRewind();
+ super.rewindInternal();
}
/** */
@@ -369,12 +369,12 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
matched = false;
left = null;
rightIdx = 0;
- super.onRewind();
+ super.rewindInternal();
}
/** {@inheritDoc} */
@@ -465,13 +465,13 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
left = null;
rightNotMatchedIndexes.clear();
lastPushedInd = 0;
rightIdx = 0;
- super.onRewind();
+ super.rewindInternal();
}
/** {@inheritDoc} */
@@ -593,14 +593,14 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
left = null;
leftMatched = false;
rightNotMatchedIndexes.clear();
lastPushedInd = 0;
rightIdx = 0;
- super.onRewind();
+ super.rewindInternal();
}
/** {@inheritDoc} */
@@ -718,11 +718,11 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
left = null;
rightIdx = 0;
- super.onRewind();
+ super.rewindInternal();
}
/** {@inheritDoc} */
@@ -786,11 +786,11 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
}
/** */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
left = null;
rightIdx = 0;
- super.onRewind();
+ super.rewindInternal();
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index fb1c1c1..d0d8e51 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -24,7 +24,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
/**
* Represents a node of execution tree.
*
- * <p/><b>Note</b>: except several cases (like consumer node and mailboxes),
{@link Node#request(int)}, {@link Node#cancel()},
+ * <p/><b>Note</b>: except several cases (like consumer node and mailboxes),
{@link Node#request(int)}, {@link Node#close()} ()},
* {@link Downstream#push(Object)} and {@link Downstream#end()} methods should
be used from one single thread.
*/
public interface Node<Row> extends AutoCloseable {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index 25a4673..35e61f9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -165,7 +165,7 @@ public class Outbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Sing
}
/** {@inheritDoc} */
- @Override public void onError(Throwable e) {
+ @Override protected void onErrorInternal(Throwable e) {
U.error(context().planningContext().logger(),
"Error occurred during execution: " + X.getFullStackTrace(e));
@@ -182,8 +182,8 @@ public class Outbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Sing
}
/** {@inheritDoc} */
- @Override public void onClose() {
- super.onClose();
+ @Override public void closeInternal() {
+ super.closeInternal();
registry.unregister(this);
@@ -198,7 +198,7 @@ public class Outbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Sing
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
throw new UnsupportedOperationException();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
index 5f3205e..dec2710 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
@@ -40,7 +40,7 @@ public class ProjectNode<Row> extends AbstractNode<Row>
implements SingleNode<Ro
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
// No-op.
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 825fe42..f38c613 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -104,7 +104,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
}
if (onClose == null)
- onClose();
+ closeInternal();
else
onClose.run();
}
@@ -115,7 +115,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
}
/** {@inheritDoc} */
- @Override public void onClose() {
+ @Override public void closeInternal() {
context().execute(() -> {
buff.clear();
@@ -184,7 +184,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
}
/** {@inheritDoc} */
- @Override public void onError(Throwable e) {
+ @Override protected void onErrorInternal(Throwable e) {
if (!ex.compareAndSet(null, e))
ex.get().addSuppressed(e);
@@ -226,7 +226,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
throw new UnsupportedOperationException();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 3e82b63..7576c02 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -80,8 +80,8 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
}
/** {@inheritDoc} */
- @Override public void onClose() {
- super.onClose();
+ @Override public void closeInternal() {
+ super.closeInternal();
Commons.closeQuiet(it);
it = null;
@@ -89,7 +89,7 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
Commons.closeQuiet(it);
it = null;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 5c686fc..3ab1148 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -50,7 +50,7 @@ public class SortNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
requested = 0;
waiting = 0;
rows.clear();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
index 9935b68..0d1a6d3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
@@ -98,7 +98,7 @@ public class UnionAllNode<Row> extends AbstractNode<Row>
implements Downstream<R
}
/** {@inheritDoc} */
- @Override protected void onRewind() {
+ @Override protected void rewindInternal() {
curSrc = 0;
waiting = 0;
}