This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new e9d229a  row types + better cancel
e9d229a is described below

commit e9d229a7beb08a7e747cb08499f4d5c339632496
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Oct 16 18:22:06 2020 +0300

    row types + better cancel
---
 .../query/calcite/exec/ExchangeServiceImpl.java    |   9 +-
 .../query/calcite/exec/ExecutionContext.java       |  21 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |  15 +-
 .../query/calcite/exec/LogicalRelImplementor.java  |  70 ++--
 .../query/calcite/exec/rel/AbstractNode.java       |  22 +-
 .../query/calcite/exec/rel/AggregateNode.java      |   6 +-
 .../exec/rel/CorrelatedNestedLoopJoinNode.java     |   6 +-
 .../query/calcite/exec/rel/FilterNode.java         |   6 +-
 .../processors/query/calcite/exec/rel/Inbox.java   |   9 +-
 .../query/calcite/exec/rel/ModifyNode.java         |   6 +-
 .../query/calcite/exec/rel/NestedLoopJoinNode.java |  43 ++-
 .../processors/query/calcite/exec/rel/Node.java    |   9 +-
 .../processors/query/calcite/exec/rel/Outbox.java  |   4 +-
 .../query/calcite/exec/rel/ProjectNode.java        |   6 +-
 .../query/calcite/exec/rel/RootNode.java           |   9 +-
 .../query/calcite/exec/rel/ScanNode.java           |   6 +-
 .../query/calcite/exec/rel/SortNode.java           |   6 +-
 .../query/calcite/exec/rel/UnionAllNode.java       |   5 +-
 .../query/calcite/rel/AbstractIndexScan.java       |   2 +-
 .../processors/query/calcite/PlannerTest.java      |   4 +-
 .../calcite/exec/rel/ContinuousExecutionTest.java  |  21 +-
 .../query/calcite/exec/rel/ExecutionTest.java      | 376 ++++++++++-----------
 22 files changed, 362 insertions(+), 299 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index fc2879b..f8d6a10 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
-
 import com.google.common.collect.ImmutableMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
@@ -158,8 +157,10 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
     protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
         Collection<Inbox<?>> inboxes = 
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
         if (!F.isEmpty(inboxes)) {
-            for (Inbox<?> inbox : inboxes)
+            for (Inbox<?> inbox : inboxes) {
+                inbox.context().cancel();
                 inbox.context().execute(inbox::close);
+            }
         }
         else if (log.isDebugEnabled()) {
             log.debug("Stale inbox cancel message received: [" +
@@ -174,8 +175,10 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
     protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
         Collection<Outbox<?>> outboxes = 
mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
         if (!F.isEmpty(outboxes)) {
-            for (Outbox<?> outbox : outboxes)
+            for (Outbox<?> outbox : outboxes) {
+                outbox.context().cancel();
                 outbox.context().execute(outbox::close);
+            }
         }
         else if (log.isDebugEnabled()) {
             log.debug("Stale oubox cancel message received: [" +
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index c9a355b..e64fa74 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -20,7 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.schema.SchemaPlus;
@@ -62,6 +62,9 @@ public class ExecutionContext<Row> implements DataContext {
     private final ExpressionFactory<Row> expressionFactory;
 
     /** */
+    private final AtomicBoolean cancelFlag = new AtomicBoolean();
+
+    /** */
     private Object[] correlations = new Object[16];
 
     /**
@@ -191,6 +194,9 @@ public class ExecutionContext<Row> implements DataContext {
 
     /** {@inheritDoc} */
     @Override public Object get(String name) {
+        if (Variable.CANCEL_FLAG.camelName.equals(name))
+            return cancelFlag;
+
         return params.get(name);
     }
 
@@ -226,4 +232,17 @@ public class ExecutionContext<Row> implements DataContext {
     public void execute(Runnable task) {
         executor.execute(qryId, fragmentId(), task);
     }
+
+    /**
+     * Sets cancel flag, returns {@code true} if flag was changed by this call.
+     *
+     * @return {@code True} if flag was changed by this call.
+     */
+    public boolean cancel() {
+        return !cancelFlag.get() && cancelFlag.compareAndSet(false, true);
+    }
+
+    public boolean isCancelled() {
+        return cancelFlag.get();
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 59d6d49..ba9e1c2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -959,7 +959,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan, 
Node<Row> root) {
             this.ctx = ctx;
 
-            RootNode<Row> rootNode = new RootNode<>(ctx, this::tryClose);
+            RootNode<Row> rootNode = new RootNode<>(ctx, root.rowType(), 
this::tryClose);
             rootNode.register(root);
 
             this.root = rootNode;
@@ -1003,18 +1003,21 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
                 if (state == QueryState.RUNNING)
                     state0 = state = QueryState.CLOSING;
 
+                // 1) Cancel local fragment
+                ctx.cancel();
+
+                // 2) close local fragment
+                root.closeInternal();
+
                 if (state == QueryState.CLOSING && waiting.isEmpty())
                     state0 = state = QueryState.CLOSED;
             }
 
             if (state0 == QueryState.CLOSED) {
-                // 1) unregister runing query
+                // 3) unregister runing query
                 running.remove(ctx.queryId());
 
-                // 2) close local fragment
-                root.closeInternal();
-
-                // 3) close remote fragments
+                // 4) close remote fragments
                 for (UUID nodeId : remotes) {
                     try {
                         exchangeService().closeOutbox(nodeId, ctx.queryId(), 
-1, -1);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 0b2a62c..75d9479 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -129,7 +129,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
         // Outbox fragment ID is used as exchange ID as well.
         Outbox<Row> outbox =
-            new Outbox<>(ctx, exchangeSvc, mailboxRegistry, rel.exchangeId(), 
rel.targetFragmentId(), dest);
+            new Outbox<>(ctx, rel.getRowType(), exchangeSvc, mailboxRegistry, 
rel.exchangeId(), rel.targetFragmentId(), dest);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -144,7 +144,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     @Override public Node<Row> visit(IgniteFilter rel) {
         Predicate<Row> pred = expressionFactory.predicate(rel.getCondition(), 
rel.getRowType());
 
-        FilterNode<Row> node = new FilterNode<>(ctx, pred);
+        FilterNode<Row> node = new FilterNode<>(ctx, rel.getRowType(), pred);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -155,7 +155,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteTrimExchange rel) {
-        FilterNode<Row> node = new FilterNode<>(ctx, 
partitionFilter(rel.distribution()));
+        FilterNode<Row> node = new FilterNode<>(ctx, rel.getRowType(), 
partitionFilter(rel.distribution()));
 
         Node<Row> input = visit(rel.getInput());
 
@@ -168,7 +168,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     @Override public Node<Row> visit(IgniteProject rel) {
         Function<Row, Row> prj = expressionFactory.project(rel.getProjects(), 
rel.getInput().getRowType());
 
-        ProjectNode<Row> node = new ProjectNode<>(ctx, prj);
+        ProjectNode<Row> node = new ProjectNode<>(ctx, rel.getRowType(), prj);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -179,6 +179,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteNestedLoopJoin rel) {
+        RelDataType outType = rel.getRowType();
         RelDataType leftType = rel.getLeft().getRowType();
         RelDataType rightType = rel.getRight().getRowType();
         JoinRelType joinType = rel.getJoinType();
@@ -186,7 +187,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         RelDataType rowType = combinedRowType(ctx.getTypeFactory(), leftType, 
rightType);
         Predicate<Row> cond = expressionFactory.predicate(rel.getCondition(), 
rowType);
 
-        Node<Row> node = NestedLoopJoinNode.create(ctx, leftType, rightType, 
joinType, cond);
+        Node<Row> node = NestedLoopJoinNode.create(ctx, outType, leftType, 
rightType, joinType, cond);
 
         Node<Row> leftInput = visit(rel.getLeft());
         Node<Row> rightInput = visit(rel.getRight());
@@ -198,13 +199,16 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteCorrelatedNestedLoopJoin rel) {
-        RelDataType rowType = combinedRowType(ctx.getTypeFactory(), 
rel.getLeft().getRowType(), rel.getRight().getRowType());
+        RelDataType outType = rel.getRowType();
+        RelDataType leftType = rel.getLeft().getRowType();
+        RelDataType rightType = rel.getRight().getRowType();
 
+        RelDataType rowType = combinedRowType(ctx.getTypeFactory(), leftType, 
rightType);
         Predicate<Row> cond = expressionFactory.predicate(rel.getCondition(), 
rowType);
 
         assert rel.getJoinType() == JoinRelType.INNER; // TODO LEFT, SEMI, ANTI
 
-        Node<Row> node = new CorrelatedNestedLoopJoinNode<>(ctx, cond, 
rel.getVariablesSet());
+        Node<Row> node = new CorrelatedNestedLoopJoinNode<>(ctx, outType, 
cond, rel.getVariablesSet());
 
         Node<Row> leftInput = visit(rel.getLeft());
         Node<Row> rightInput = visit(rel.getRight());
@@ -226,17 +230,17 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         List<RexNode> lowerCond = rel.lowerBound();
         List<RexNode> upperCond = rel.upperBound();
 
-        RelDataType cols = tbl.getRowType(typeFactory, requiredColunms);
+        RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
 
-        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, cols);
+        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, rowType);
         Supplier<Row> lower = lowerCond == null ? null : 
expressionFactory.rowSource(lowerCond);
         Supplier<Row> upper = upperCond == null ? null : 
expressionFactory.rowSource(upperCond);
-        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, cols);
+        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, rowType);
 
         IgniteIndex idx = tbl.getIndex(rel.indexName());
         Iterable<Row> rowsIter = idx.scan(ctx, filters, lower, upper, prj, 
requiredColunms);
 
-        return new ScanNode<>(ctx, rowsIter);
+        return new ScanNode<>(ctx, rowType, rowsIter);
     }
 
     /** {@inheritDoc} */
@@ -248,26 +252,28 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
         IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
-        RelDataType cols = tbl.getRowType(typeFactory, requiredColunms);
+        RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
 
-        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, cols);
-        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, cols);
+        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, rowType);
+        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, rowType);
 
         Iterable<Row> rowsIter = tbl.scan(ctx, filters, prj, requiredColunms);
 
-        return new ScanNode<>(ctx, rowsIter);
+        return new ScanNode<>(ctx, rowType, rowsIter);
     }
 
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteValues rel) {
         List<RexLiteral> vals = Commons.flat(Commons.cast(rel.getTuples()));
 
-        return new ScanNode<>(ctx, expressionFactory.values(vals, 
rel.getRowType()));
+        RelDataType rowType = rel.getRowType();
+
+        return new ScanNode<>(ctx, rowType, expressionFactory.values(vals, 
rowType));
     }
 
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteUnionAll rel) {
-        UnionAllNode<Row> node = new UnionAllNode<>(ctx);
+        UnionAllNode<Row> node = new UnionAllNode<>(ctx, rel.getRowType());
 
         List<Node<Row>> inputs = Commons.transform(rel.getInputs(), 
this::visit);
 
@@ -280,7 +286,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     @Override public Node<Row> visit(IgniteSort rel) {
         RelCollation collation = rel.getCollation();
 
-        SortNode<Row> node = new SortNode<>(ctx, 
expressionFactory.comparator(collation));
+        SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(), 
expressionFactory.comparator(collation));
 
         Node<Row> input = visit(rel.getInput());
 
@@ -295,7 +301,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
             case INSERT:
             case UPDATE:
             case DELETE:
-                ModifyNode<Row> node = new ModifyNode<>(ctx, 
rel.getTable().unwrap(TableDescriptor.class),
+                ModifyNode<Row> node = new ModifyNode<>(ctx, rel.getRowType(), 
rel.getTable().unwrap(TableDescriptor.class),
                     rel.getOperation(), rel.getUpdateColumnList());
 
                 Node<Row> input = visit(rel.getInput());
@@ -317,7 +323,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
         // here may be an already created (to consume rows from remote nodes) 
inbox
         // without proper context, we need to init it with a right one.
-        inbox.init(ctx, ctx.remoteSources(rel.exchangeId()), 
expressionFactory.comparator(rel.collation()));
+        inbox.init(ctx, rel.getRowType(), ctx.remoteSources(rel.exchangeId()), 
expressionFactory.comparator(rel.collation()));
 
         return inbox;
     }
@@ -326,11 +332,14 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     @Override public Node<Row> visit(IgniteAggregate rel) {
         AggregateNode.AggregateType type = AggregateNode.AggregateType.SINGLE;
 
+        RelDataType rowType = rel.getRowType();
+        RelDataType inputType = rel.getInput().getRowType();
+
         Supplier<List<AccumulatorWrapper<Row>>> accFactory = 
expressionFactory.accumulatorsFactory(
-            type, rel.getAggCallList(), rel.getInput().getRowType());
-        RowFactory<Row> rowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType());
+            type, rel.getAggCallList(), inputType);
+        RowFactory<Row> rowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
-        AggregateNode<Row> node = new AggregateNode<>(ctx, type, 
rel.getGroupSets(), accFactory, rowFactory);
+        AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, 
rel.getGroupSets(), accFactory, rowFactory);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -343,11 +352,14 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     @Override public Node<Row> visit(IgniteMapAggregate rel) {
         AggregateNode.AggregateType type = AggregateNode.AggregateType.MAP;
 
+        RelDataType rowType = rel.getRowType();
+        RelDataType inputType = rel.getInput().getRowType();
+
         Supplier<List<AccumulatorWrapper<Row>>> accFactory = 
expressionFactory.accumulatorsFactory(
-            type, rel.getAggCallList(), rel.getInput().getRowType());
-        RowFactory<Row> rowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType());
+            type, rel.getAggCallList(), inputType);
+        RowFactory<Row> rowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
-        AggregateNode<Row> node = new AggregateNode<>(ctx, type, 
rel.getGroupSets(), accFactory, rowFactory);
+        AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, 
rel.getGroupSets(), accFactory, rowFactory);
 
         Node<Row> input = visit(rel.getInput());
 
@@ -360,11 +372,13 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     @Override public Node<Row> visit(IgniteReduceAggregate rel) {
         AggregateNode.AggregateType type = AggregateNode.AggregateType.REDUCE;
 
+        RelDataType rowType = rel.getRowType();
+
         Supplier<List<AccumulatorWrapper<Row>>> accFactory = 
expressionFactory.accumulatorsFactory(
             type, rel.aggregateCalls(), null);
-        RowFactory<Row> rowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType());
+        RowFactory<Row> rowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
-        AggregateNode<Row> node = new AggregateNode<>(ctx, type, 
rel.groupSets(), accFactory, rowFactory);
+        AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type, 
rel.groupSets(), accFactory, rowFactory);
 
         Node<Row> input = visit(rel.getInput());
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index f021516..ab410a6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -20,7 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.rel;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -53,7 +53,10 @@ public abstract class AbstractNode<Row> implements Node<Row> 
{
      * creates on first message received from a remote source. This case the 
context
      * sets in scope of {@link Inbox#init(ExecutionContext, Collection, 
Comparator)} method call.
      */
-    private volatile ExecutionContext<Row> ctx;
+    private ExecutionContext<Row> ctx;
+
+    /** */
+    private RelDataType rowType;
 
     /** */
     private Downstream<Row> downstream;
@@ -67,8 +70,9 @@ public abstract class AbstractNode<Row> implements Node<Row> {
     /**
      * @param ctx Execution context.
      */
-    protected AbstractNode(ExecutionContext<Row> ctx) {
+    protected AbstractNode(ExecutionContext<Row> ctx, RelDataType rowType) {
         this.ctx = ctx;
+        this.rowType = rowType;
     }
 
     /**
@@ -86,6 +90,16 @@ public abstract class AbstractNode<Row> implements Node<Row> 
{
     }
 
     /** {@inheritDoc} */
+    @Override public RelDataType rowType() {
+        return rowType;
+    }
+
+    /** */
+    protected void rowType(RelDataType rowType) {
+        this.rowType = rowType;
+    }
+
+    /** {@inheritDoc} */
     @Override public void register(List<Node<Row>> sources) {
         this.sources = sources;
 
@@ -165,7 +179,7 @@ public abstract class AbstractNode<Row> implements 
Node<Row> {
 
     /** */
     protected void checkState() throws IgniteCheckedException {
-        if (isClosed())
+        if (context().isCancelled())
             throw new ExecutionCancelledException();
         if (Thread.interrupted())
             throw new IgniteInterruptedCheckedException("Thread was 
interrupted.");
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
index 80911e0..7e242dd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
-
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -72,9 +72,9 @@ public class AggregateNode<Row> extends AbstractNode<Row> 
implements SingleNode<
     /**
      * @param ctx Execution context.
      */
-    public AggregateNode(ExecutionContext<Row> ctx, AggregateType type, 
List<ImmutableBitSet> grpSets,
+    public AggregateNode(ExecutionContext<Row> ctx, RelDataType rowType, 
AggregateType type, List<ImmutableBitSet> grpSets,
         Supplier<List<AccumulatorWrapper<Row>>> accFactory, RowFactory<Row> 
rowFactory) {
-        super(ctx);
+        super(ctx, rowType);
 
         this.type = type;
         this.accFactory = accFactory;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
index d102be1..ca0d7a4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -22,8 +22,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Predicate;
-
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
@@ -81,8 +81,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
      * @param ctx Execution context.
      * @param cond Join expression.
      */
-    public CorrelatedNestedLoopJoinNode(ExecutionContext<Row> ctx, 
Predicate<Row> cond, Set<CorrelationId> correlationIds) {
-        super(ctx);
+    public CorrelatedNestedLoopJoinNode(ExecutionContext<Row> ctx, RelDataType 
rowType, Predicate<Row> cond, Set<CorrelationId> correlationIds) {
+        super(ctx, rowType);
 
         assert !F.isEmpty(correlationIds);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
index 06703eb..cbc0922 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
@@ -20,7 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.rel;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.function.Predicate;
-
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.util.typedef.F;
@@ -48,8 +48,8 @@ public class FilterNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
      * @param ctx Execution context.
      * @param pred Predicate.
      */
-    public FilterNode(ExecutionContext<Row> ctx, Predicate<Row> pred) {
-        super(ctx);
+    public FilterNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> pred) {
+        super(ctx, rowType);
 
         this.pred = pred;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index 79e8e71..9c0c918 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -84,7 +85,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements 
Mailbox<Row>, Singl
         long exchangeId,
         long srcFragmentId
     ) {
-        super(ctx);
+        super(ctx, ctx.getTypeFactory().createUnknownType());
         this.exchange = exchange;
         this.registry = registry;
 
@@ -106,11 +107,13 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
      * @param srcNodeIds Source node IDs.
      * @param comp Optional comparator for merge exchange.
      */
-    public void init(ExecutionContext<Row> ctx, Collection<UUID> srcNodeIds, 
@Nullable Comparator<Row> comp) {
+    public void init(ExecutionContext<Row> ctx, RelDataType rowType, 
Collection<UUID> srcNodeIds, @Nullable Comparator<Row> comp) {
         // It's important to set proper context here because
-        // because the one, that is created on a first message
+        // the one, that is created on a first message
         // received doesn't have all context variables in place.
         context(ctx);
+        rowType(rowType);
+
         this.comp = comp;
 
         // memory barier
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index a89255f..5d253c3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -21,13 +21,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
-
 import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -77,11 +76,12 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
      */
     public ModifyNode(
         ExecutionContext<Row> ctx,
+        RelDataType rowType,
         TableDescriptor desc,
         TableModify.Operation op,
         List<String> cols
     ) {
-        super(ctx);
+        super(ctx, rowType);
 
         this.desc = desc;
         this.op = op;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index 9a90d36..b634ac2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -23,7 +23,6 @@ import java.util.BitSet;
 import java.util.Deque;
 import java.util.List;
 import java.util.function.Predicate;
-
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
@@ -65,8 +64,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
      * @param ctx Execution context.
      * @param cond Join expression.
      */
-    private NestedLoopJoinNode(ExecutionContext<Row> ctx, Predicate<Row> cond) 
{
-        super(ctx);
+    private NestedLoopJoinNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> cond) {
+        super(ctx, rowType);
 
         this.cond = cond;
         handler = ctx.rowHandler();
@@ -239,36 +238,36 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
     protected abstract void join() throws IgniteCheckedException;
 
     /** */
-    @NotNull public static <Row> NestedLoopJoinNode<Row> 
create(ExecutionContext<Row> ctx, RelDataType leftRowType,
+    @NotNull public static <Row> NestedLoopJoinNode<Row> 
create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType 
leftRowType,
         RelDataType rightRowType, JoinRelType joinType, Predicate<Row> cond) {
         switch (joinType) {
             case INNER:
-                return new InnerJoin<>(ctx, cond);
+                return new InnerJoin<>(ctx, outputRowType, cond);
 
             case LEFT: {
                 RowHandler.RowFactory<Row> rightRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
 
-                return new LeftJoin<>(ctx, cond, rightRowFactory);
+                return new LeftJoin<>(ctx, outputRowType, cond, 
rightRowFactory);
             }
 
             case RIGHT: {
                 RowHandler.RowFactory<Row> leftRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
 
-                return new RightJoin<>(ctx, cond, leftRowFactory);
+                return new RightJoin<>(ctx, outputRowType, cond, 
leftRowFactory);
             }
 
             case FULL: {
                 RowHandler.RowFactory<Row> leftRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
                 RowHandler.RowFactory<Row> rightRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
 
-                return new FullOuterJoin<>(ctx, cond, leftRowFactory, 
rightRowFactory);
+                return new FullOuterJoin<>(ctx, outputRowType, cond, 
leftRowFactory, rightRowFactory);
             }
 
             case SEMI:
-                return new SemiJoin<>(ctx, cond);
+                return new SemiJoin<>(ctx, outputRowType, cond);
 
             case ANTI:
-                return new AntiJoin<>(ctx, cond);
+                return new AntiJoin<>(ctx, outputRowType, cond);
 
             default:
                 throw new IllegalStateException("Join type \"" + joinType + 
"\" is not supported yet");
@@ -287,8 +286,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param cond Join expression.
          */
-        public InnerJoin(ExecutionContext<Row> ctx, Predicate<Row> cond) {
-            super(ctx, cond);
+        public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> cond) {
+            super(ctx, rowType, cond);
         }
 
         /** {@inheritDoc} */
@@ -362,8 +361,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param cond Join expression.
          */
-        public LeftJoin(ExecutionContext<Row> ctx, Predicate<Row> cond, 
RowHandler.RowFactory<Row> rightRowFactory) {
-            super(ctx, cond);
+        public LeftJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> cond, RowHandler.RowFactory<Row> rightRowFactory) {
+            super(ctx, rowType, cond);
 
             this.rightRowFactory = rightRowFactory;
         }
@@ -458,8 +457,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param cond Join expression.
          */
-        public RightJoin(ExecutionContext<Row> ctx, Predicate<Row> cond, 
RowHandler.RowFactory<Row> leftRowFactory) {
-            super(ctx, cond);
+        public RightJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> cond, RowHandler.RowFactory<Row> leftRowFactory) {
+            super(ctx, rowType, cond);
 
             this.leftRowFactory = leftRowFactory;
         }
@@ -584,9 +583,9 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param cond Join expression.
          */
-        public FullOuterJoin(ExecutionContext<Row> ctx, Predicate<Row> cond, 
RowHandler.RowFactory<Row> leftRowFactory,
+        public FullOuterJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> cond, RowHandler.RowFactory<Row> leftRowFactory,
             RowHandler.RowFactory<Row> rightRowFactory) {
-            super(ctx, cond);
+            super(ctx, rowType, cond);
 
             this.leftRowFactory = leftRowFactory;
             this.rightRowFactory = rightRowFactory;
@@ -713,8 +712,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param cond Join expression.
          */
-        public SemiJoin(ExecutionContext<Row> ctx, Predicate<Row> cond) {
-            super(ctx, cond);
+        public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> cond) {
+            super(ctx, rowType, cond);
         }
 
         /** {@inheritDoc} */
@@ -781,8 +780,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param cond Join expression.
          */
-        public AntiJoin(ExecutionContext<Row> ctx, Predicate<Row> cond) {
-            super(ctx, cond);
+        public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Predicate<Row> cond) {
+            super(ctx, rowType, cond);
         }
 
         /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index d0d8e51..028baa6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.List;
-
+import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 
 /**
@@ -36,6 +36,13 @@ public interface Node<Row> extends AutoCloseable {
     ExecutionContext<Row> context();
 
     /**
+     * Returns logical node output row type.
+     *
+     * @return Logical node output row type.
+     */
+    RelDataType rowType();
+
+    /**
      * @return Node downstream.
      */
     Downstream<Row> downstream();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index c5105f3..3c44646 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -73,13 +74,14 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Sing
      */
     public Outbox(
         ExecutionContext<Row> ctx,
+        RelDataType rowType,
         ExchangeService exchange,
         MailboxRegistry registry,
         long exchangeId,
         long targetFragmentId,
         Destination<Row> dest
     ) {
-        super(ctx);
+        super(ctx, rowType);
         this.exchange = exchange;
         this.registry = registry;
         this.targetFragmentId = targetFragmentId;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
index dec2710..4da1284 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.function.Function;
-
+import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -33,8 +33,8 @@ public class ProjectNode<Row> extends AbstractNode<Row> 
implements SingleNode<Ro
      * @param ctx Execution context.
      * @param prj Projection.
      */
-    public ProjectNode(ExecutionContext<Row> ctx, Function<Row, Row> prj) {
-        super(ctx);
+    public ProjectNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Function<Row, Row> prj) {
+        super(ctx, rowType);
 
         this.prj = prj;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 75a6962..b7cf76c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -65,8 +66,8 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     /**
      * @param ctx Execution context.
      */
-    public RootNode(ExecutionContext<Row> ctx) {
-        super(ctx);
+    public RootNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+        super(ctx, rowType);
 
         onClose = this::closeInternal;
     }
@@ -74,8 +75,8 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     /**
      * @param ctx Execution context.
      */
-    public RootNode(ExecutionContext<Row> ctx, Runnable onClose) {
-        super(ctx);
+    public RootNode(ExecutionContext<Row> ctx, RelDataType rowType, Runnable 
onClose) {
+        super(ctx, rowType);
 
         this.onClose = onClose;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 7576c02..739c773 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -19,7 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.Iterator;
 import java.util.List;
-
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -44,8 +44,8 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
      * @param ctx Execution context.
      * @param src Source.
      */
-    public ScanNode(ExecutionContext<Row> ctx, Iterable<Row> src) {
-        super(ctx);
+    public ScanNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Iterable<Row> src) {
+        super(ctx, rowType);
 
         this.src = src;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 3ab1148..782b012 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -18,7 +18,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.Comparator;
 import java.util.PriorityQueue;
-
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.util.typedef.F;
@@ -43,8 +43,8 @@ public class SortNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
      * @param ctx Execution context.
      * @param comp Rows comparator.
      */
-    public SortNode(ExecutionContext<Row> ctx, Comparator<Row> comp) {
-        super(ctx);
+    public SortNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp) {
+        super(ctx, rowType);
 
         rows = comp == null ? new PriorityQueue<>() : new 
PriorityQueue<>(comp);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
index 0d1a6d3..385577b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
+import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -33,8 +34,8 @@ public class UnionAllNode<Row> extends AbstractNode<Row> 
implements Downstream<R
     /**
      * @param ctx Execution context.
      */
-    public UnionAllNode(ExecutionContext<Row> ctx) {
-        super(ctx);
+    public UnionAllNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+        super(ctx, rowType);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
index ea394af..78017c8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
@@ -92,7 +92,7 @@ public abstract class AbstractIndexScan extends 
ProjectableFilterableTableScan {
         pw = super.explainTerms0(pw);
         return pw
             .itemIf("lower", lowerBound, !F.isEmpty(lowerBound()))
-            .itemIf("upper", upperBound, !F.isEmpty(lowerBound()));
+            .itemIf("upper", upperBound, !F.isEmpty(upperBound()));
     }
 
     /**
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index f05a2e6..4188276 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -1341,7 +1341,7 @@ public class PlannerTest extends GridCommonAbstractTest {
             exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0, 
mailboxRegistry, exchangeSvc,
                 new TestFailureProcessor(kernal)).go(fragment.root());
 
-            RootNode<Object[]> consumer = new RootNode<>(ectx);
+            RootNode<Object[]> consumer = new RootNode<>(ectx, exec.rowType());
             consumer.register(exec);
 
             //// Remote part
@@ -1600,7 +1600,7 @@ public class PlannerTest extends GridCommonAbstractTest {
             exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0, 
mailboxRegistry, exchangeSvc,
                 new TestFailureProcessor(kernal)).go(fragment.root());
 
-            RootNode<Object[]> consumer = new RootNode<>(ectx);
+            RootNode<Object[]> consumer = new RootNode<>(ectx,exec.rowType());
             consumer.register(exec);
 
             //// Remote part
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
index ae07e61..e3724f2 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
@@ -22,11 +22,13 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.UUID;
-
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
 import org.apache.ignite.internal.processors.query.calcite.trait.AllNodes;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -111,18 +113,21 @@ public class ContinuousExecutionTest extends 
AbstractExecutionTest {
             };
 
             ExecutionContext<Object[]> ectx = executionContext(locNodeId, 
qryId, 0);
+            IgniteTypeFactory tf = ectx.getTypeFactory();
 
-            ScanNode<Object[]> scan = new ScanNode<>(ectx, iterable);
+            RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
int.class, int.class, int.class, int.class, int.class);
+            ScanNode<Object[]> scan = new ScanNode<>(ectx, rowType, iterable);
 
-            ProjectNode<Object[]> project = new ProjectNode<>(ectx, r -> new 
Object[]{r[0], r[1], r[5]});
+            rowType = TypeUtils.createRowType(tf, int.class, int.class, 
int.class);
+            ProjectNode<Object[]> project = new ProjectNode<>(ectx, rowType, r 
-> new Object[]{r[0], r[1], r[5]});
             project.register(scan);
 
-            FilterNode<Object[]> filter = new FilterNode<>(ectx, r -> 
(Integer) r[0] >= 2);
+            FilterNode<Object[]> filter = new FilterNode<>(ectx, rowType, r -> 
(Integer) r[0] >= 2);
             filter.register(project);
 
             MailboxRegistry registry = mailboxRegistry(locNodeId);
 
-            Outbox<Object[]> outbox = new Outbox<>(ectx, 
exchangeService(locNodeId), registry,
+            Outbox<Object[]> outbox = new Outbox<>(ectx, rowType, 
exchangeService(locNodeId), registry,
                 0, 1, new AllNodes(nodes.subList(0, 1)));
 
             outbox.register(filter);
@@ -134,15 +139,17 @@ public class ContinuousExecutionTest extends 
AbstractExecutionTest {
         UUID locNodeId = nodes.get(0);
 
         ExecutionContext<Object[]> ectx = executionContext(locNodeId, qryId, 
1);
+        IgniteTypeFactory tf = ectx.getTypeFactory();
 
         MailboxRegistry registry = mailboxRegistry(locNodeId);
 
         Inbox<Object[]> inbox = (Inbox<Object[]>) registry.register(
             new Inbox<>(ectx, exchangeService(locNodeId), registry, 0, 0));
 
-        inbox.init(ectx, nodes.subList(1, nodes.size()), null);
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
int.class, int.class);
+        inbox.init(ectx, rowType, nodes.subList(1, nodes.size()), null);
 
-        RootNode<Object[]> node = new RootNode<>(ectx);
+        RootNode<Object[]> node = new RootNode<>(ectx, rowType);
 
         node.register(inbox);
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 8e8b900..bbbee53 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -22,21 +22,19 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.function.Supplier;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -80,34 +78,39 @@ public class ExecutionTest extends AbstractExecutionTest {
         // WHERE P.ID >= 2
 
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
String.class, String.class);
 
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType, 
Arrays.asList(
             new Object[]{0, "Igor", "Seliverstov"},
             new Object[]{1, "Roman", "Kondakov"},
             new Object[]{2, "Ivan", "Pavlukhin"},
             new Object[]{3, "Alexey", "Goncharuk"}
         ));
 
-        ScanNode<Object[]> projects = new ScanNode<>(ctx, Arrays.asList(
+        rowType = TypeUtils.createRowType(tf, int.class, int.class, 
String.class);
+        ScanNode<Object[]> projects = new ScanNode<>(ctx, rowType, 
Arrays.asList(
             new Object[]{0, 2, "Calcite"},
             new Object[]{1, 1, "SQL"},
             new Object[]{2, 2, "Ignite"},
             new Object[]{3, 0, "Core"}
         ));
 
-        RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, String.class);
-        RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, int.class, String.class);
+        RelDataType outType = TypeUtils.createRowType(tf, int.class, 
String.class, String.class, int.class, int.class, String.class);
+        RelDataType leftType = TypeUtils.createRowType(tf, int.class, 
String.class, String.class);
+        RelDataType rightType = TypeUtils.createRowType(tf, int.class, 
int.class, String.class);
 
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
leftType, rightType, INNER, r -> r[0] == r[4]);
+        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
outType, leftType, rightType, INNER, r -> r[0] == r[4]);
         join.register(F.asList(persons, projects));
 
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[0], r[1], r[5]});
+        rowType = TypeUtils.createRowType(tf, int.class, String.class, 
String.class);
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r -> 
new Object[]{r[0], r[1], r[5]});
         project.register(join);
 
-        FilterNode<Object[]> filter = new FilterNode<>(ctx, r -> (Integer) 
r[0] >= 2);
+        FilterNode<Object[]> filter = new FilterNode<>(ctx, rowType, r -> 
(Integer) r[0] >= 2);
         filter.register(project);
 
-        RootNode<Object[]> node = new RootNode<>(ctx);
+        RootNode<Object[]> node = new RootNode<>(ctx, rowType);
         node.register(filter);
 
         assert node.hasNext();
@@ -127,31 +130,34 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testUnionAll() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-        ScanNode<Object[]> scan1 = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+
+        ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        ScanNode<Object[]> scan2 = new ScanNode<>(ctx, Arrays.asList(
+        ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        ScanNode<Object[]> scan3 = new ScanNode<>(ctx, Arrays.asList(
+        ScanNode<Object[]> scan3 = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        UnionAllNode<Object[]> union = new UnionAllNode<>(ctx);
+        UnionAllNode<Object[]> union = new UnionAllNode<>(ctx, rowType);
         union.register(F.asList(scan1, scan2, scan3));
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, rowType);
         root.register(union);
 
         assertTrue(root.hasNext());
@@ -174,28 +180,34 @@ public class ExecutionTest extends AbstractExecutionTest {
 
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
 
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
String.class, Integer.class);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType, 
Arrays.asList(
             new Object[]{0, "Igor", 1},
             new Object[]{1, "Roman", 2},
             new Object[]{2, "Ivan", null},
             new Object[]{3, "Alexey", 1}
         ));
 
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+        rowType = TypeUtils.createRowType(tf, int.class, String.class);
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
             new Object[]{1, "Core"},
             new Object[]{2, "SQL"}
         ));
 
+        RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class, int.class, String.class);
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
         RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class);
 
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
leftType, rightType, LEFT, r -> r[2] == r[3]);
+        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
outType, leftType, rightType, LEFT, r -> r[2] == r[3]);
         join.register(F.asList(persons, deps));
 
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[0], r[1], r[4]});
+        rowType = TypeUtils.createRowType(tf, int.class, String.class, 
String.class);
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r -> 
new Object[]{r[0], r[1], r[4]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx);
+        RootNode<Object[]> node = new RootNode<>(ctx, rowType);
         node.register(project);
 
         assert node.hasNext();
@@ -222,30 +234,35 @@ public class ExecutionTest extends AbstractExecutionTest {
         //         on e.depno = d.depno
 
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
String.class, Integer.class);
 
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType, 
Arrays.asList(
             new Object[]{0, "Igor", 1},
             new Object[]{1, "Roman", 2},
             new Object[]{2, "Ivan", null},
             new Object[]{3, "Alexey", 1}
         ));
 
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+        rowType = TypeUtils.createRowType(tf, int.class, String.class);
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
             new Object[]{1, "Core"},
             new Object[]{2, "SQL"},
             new Object[]{3, "QA"}
         ));
 
+        RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, int.class, String.class, Integer.class);
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class);
         RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
 
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
leftType, rightType, RIGHT, r -> r[0] == r[4]);
+        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
outType, leftType, rightType, RIGHT, r -> r[0] == r[4]);
         join.register(F.asList(deps, persons));
 
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[2], r[3], r[1]});
+        rowType = TypeUtils.createRowType(tf, int.class, String.class, 
String.class);
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r -> 
new Object[]{r[2], r[3], r[1]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx);
+        RootNode<Object[]> node = new RootNode<>(ctx, rowType);
         node.register(project);
 
         assert node.hasNext();
@@ -272,30 +289,35 @@ public class ExecutionTest extends AbstractExecutionTest {
         //              on e.depno = d.depno
 
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
String.class, Integer.class);
 
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType, 
Arrays.asList(
             new Object[]{0, "Igor", 1},
             new Object[]{1, "Roman", 2},
             new Object[]{2, "Ivan", null},
             new Object[]{3, "Alexey", 1}
         ));
 
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+        rowType = TypeUtils.createRowType(tf, int.class, String.class);
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
             new Object[]{1, "Core"},
             new Object[]{2, "SQL"},
             new Object[]{3, "QA"}
         ));
 
+        RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class, int.class, String.class);
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
         RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class);
 
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
leftType, rightType, FULL, r -> r[2] == r[3]);
+        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
outType, leftType, rightType, FULL, r -> r[2] == r[3]);
         join.register(F.asList(persons, deps));
 
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[0], r[1], r[4]});
+        rowType = TypeUtils.createRowType(tf, Integer.class, String.class, 
String.class);
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r -> 
new Object[]{r[0], r[1], r[4]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx);
+        RootNode<Object[]> node = new RootNode<>(ctx, rowType);
         node.register(project);
 
         assert node.hasNext();
@@ -323,30 +345,35 @@ public class ExecutionTest extends AbstractExecutionTest {
         //        on e.depno = d.depno
 
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
String.class, Integer.class);
 
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType, 
Arrays.asList(
             new Object[]{0, "Igor", 1},
             new Object[]{1, "Roman", 2},
             new Object[]{2, "Ivan", null},
             new Object[]{3, "Alexey", 1}
         ));
 
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+        rowType = TypeUtils.createRowType(tf, int.class, String.class);
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
             new Object[]{1, "Core"},
             new Object[]{2, "SQL"},
             new Object[]{3, "QA"}
         ));
 
+        RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
         RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class);
 
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
leftType, rightType, SEMI, r -> r[0] == r[4]);
+        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
outType, leftType, rightType, SEMI, r -> r[0] == r[4]);
         join.register(F.asList(deps, persons));
 
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[1]});
+        rowType = TypeUtils.createRowType(tf, String.class);
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r -> 
new Object[]{r[1]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx);
+        RootNode<Object[]> node = new RootNode<>(ctx, rowType);
         node.register(project);
 
         assert node.hasNext();
@@ -371,30 +398,35 @@ public class ExecutionTest extends AbstractExecutionTest {
         //        on e.depno = d.depno
 
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
String.class, Integer.class);
 
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType, 
Arrays.asList(
             new Object[]{0, "Igor", 1},
             new Object[]{1, "Roman", 2},
             new Object[]{2, "Ivan", null},
             new Object[]{3, "Alexey", 1}
         ));
 
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+        rowType = TypeUtils.createRowType(tf, int.class, String.class);
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
             new Object[]{1, "Core"},
             new Object[]{2, "SQL"},
             new Object[]{3, "QA"}
         ));
 
+        RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
         RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class);
 
-        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
leftType, rightType, ANTI, r -> r[0] == r[4]);
+        NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx, 
outType, leftType, rightType, ANTI, r -> r[0] == r[4]);
         join.register(F.asList(deps, persons));
 
-        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[1]});
+        rowType = TypeUtils.createRowType(tf, String.class);
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r -> 
new Object[]{r[1]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx);
+        RootNode<Object[]> node = new RootNode<>(ctx, rowType);
         node.register(project);
 
         assert node.hasNext();
@@ -413,22 +445,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateMapReduceAvg() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.AVG,
             false,
@@ -437,16 +462,20 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.DOUBLE),
+            tf.createJavaType(double.class),
             null);
 
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets, 
accFactory(ctx, call, MAP, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapAggregate.rowType(tf);
+        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, 
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
         map.register(scan);
 
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+        RelDataType reduceType = TypeUtils.createRowType(tf, double.class);
+        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, 
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -458,23 +487,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateMapReduceSum() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        // empty groups means SELECT SUM(field) FROM table
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
-        // AVG on second field
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.SUM,
             false,
@@ -483,16 +504,20 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets, 
accFactory(ctx, call, MAP, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapAggregate.rowType(tf);
+        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, 
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
         map.register(scan);
 
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, 
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -504,22 +529,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateMapReduceMin() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.MIN,
             false,
@@ -528,16 +546,20 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets, 
accFactory(ctx, call, MAP, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapAggregate.rowType(tf);
+        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, 
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
         map.register(scan);
 
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, 
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -549,22 +571,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateMapReduceMax() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.MAX,
             false,
@@ -573,16 +588,20 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets, 
accFactory(ctx, call, MAP, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapAggregate.rowType(tf);
+        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, 
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
         map.register(scan);
 
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, 
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -594,22 +613,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateMapReduceCount() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.COUNT,
             false,
@@ -618,16 +630,20 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets, 
accFactory(ctx, call, MAP, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType mapType = IgniteMapAggregate.rowType(tf);
+        AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP, 
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
         map.register(scan);
 
-        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+        RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType, 
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -639,22 +655,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateAvg() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.AVG,
             false,
@@ -663,13 +672,16 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.DOUBLE),
+            tf.createJavaType(double.class),
             null);
 
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, 
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -681,24 +693,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateSum() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        // empty groups means SELECT SUM(field) FROM table
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
-        // AVG on second field
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.SUM,
             false,
@@ -707,13 +710,16 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, 
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -725,22 +731,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateMin() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.MIN,
             false,
@@ -749,13 +748,16 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, 
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -767,22 +769,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateMax() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.MAX,
             false,
@@ -791,15 +786,19 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(1),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, 
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
         root.register(agg);
 
+
         assertTrue(root.hasNext());
         assertEquals(1400, root.next()[0]);
         assertFalse(root.hasNext());
@@ -809,22 +808,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateCount() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 200),
             row("Roman", 300),
             row("Ivan", 1400),
             row("Alexey", 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.COUNT,
             false,
@@ -833,13 +825,16 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of());
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, 
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -851,23 +846,15 @@ public class ExecutionTest extends AbstractExecutionTest {
     @Test
     public void testAggregateCountByGroup() {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
-
-        ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, String.class, 
int.class, int.class);
+        ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
             row("Igor", 0, 200),
             row("Roman", 1, 300),
             row("Ivan", 1, 1400),
             row("Alexey", 0, 1000)
         ));
 
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
-        RelDataType rowType = typeFactory.createStructType(F.asList(
-            Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
-            Pair.of("PROJECT_ID", 
typeFactory.createSqlType(SqlTypeName.INTEGER)),
-            Pair.of("SALARY", 
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
-        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of(1));
-
         AggregateCall call = AggregateCall.create(
             SqlStdOperatorTable.COUNT,
             false,
@@ -876,13 +863,16 @@ public class ExecutionTest extends AbstractExecutionTest {
             ImmutableIntList.of(),
             -1,
             RelCollations.EMPTY,
-            typeFactory.createSqlType(SqlTypeName.INTEGER),
+            tf.createJavaType(int.class),
             null);
 
-        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+        ImmutableList<ImmutableBitSet> grpSets = 
ImmutableList.of(ImmutableBitSet.of(1));
+
+        RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+        AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType, 
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx);
+        RootNode<Object[]> root = new RootNode<>(ctx, aggType);
         root.register(agg);
 
         assertTrue(root.hasNext());

Reply via email to