This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new e9d229a row types + better cancel
e9d229a is described below
commit e9d229a7beb08a7e747cb08499f4d5c339632496
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Oct 16 18:22:06 2020 +0300
row types + better cancel
---
.../query/calcite/exec/ExchangeServiceImpl.java | 9 +-
.../query/calcite/exec/ExecutionContext.java | 21 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 15 +-
.../query/calcite/exec/LogicalRelImplementor.java | 70 ++--
.../query/calcite/exec/rel/AbstractNode.java | 22 +-
.../query/calcite/exec/rel/AggregateNode.java | 6 +-
.../exec/rel/CorrelatedNestedLoopJoinNode.java | 6 +-
.../query/calcite/exec/rel/FilterNode.java | 6 +-
.../processors/query/calcite/exec/rel/Inbox.java | 9 +-
.../query/calcite/exec/rel/ModifyNode.java | 6 +-
.../query/calcite/exec/rel/NestedLoopJoinNode.java | 43 ++-
.../processors/query/calcite/exec/rel/Node.java | 9 +-
.../processors/query/calcite/exec/rel/Outbox.java | 4 +-
.../query/calcite/exec/rel/ProjectNode.java | 6 +-
.../query/calcite/exec/rel/RootNode.java | 9 +-
.../query/calcite/exec/rel/ScanNode.java | 6 +-
.../query/calcite/exec/rel/SortNode.java | 6 +-
.../query/calcite/exec/rel/UnionAllNode.java | 5 +-
.../query/calcite/rel/AbstractIndexScan.java | 2 +-
.../processors/query/calcite/PlannerTest.java | 4 +-
.../calcite/exec/rel/ContinuousExecutionTest.java | 21 +-
.../query/calcite/exec/rel/ExecutionTest.java | 376 ++++++++++-----------
22 files changed, 362 insertions(+), 299 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index fc2879b..f8d6a10 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
-
import com.google.common.collect.ImmutableMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
@@ -158,8 +157,10 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
Collection<Inbox<?>> inboxes =
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
if (!F.isEmpty(inboxes)) {
- for (Inbox<?> inbox : inboxes)
+ for (Inbox<?> inbox : inboxes) {
+ inbox.context().cancel();
inbox.context().execute(inbox::close);
+ }
}
else if (log.isDebugEnabled()) {
log.debug("Stale inbox cancel message received: [" +
@@ -174,8 +175,10 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
Collection<Outbox<?>> outboxes =
mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
if (!F.isEmpty(outboxes)) {
- for (Outbox<?> outbox : outboxes)
+ for (Outbox<?> outbox : outboxes) {
+ outbox.context().cancel();
outbox.context().execute(outbox::close);
+ }
}
else if (log.isDebugEnabled()) {
log.debug("Stale oubox cancel message received: [" +
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index c9a355b..e64fa74 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -20,7 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
@@ -62,6 +62,9 @@ public class ExecutionContext<Row> implements DataContext {
private final ExpressionFactory<Row> expressionFactory;
/** */
+ private final AtomicBoolean cancelFlag = new AtomicBoolean();
+
+ /** */
private Object[] correlations = new Object[16];
/**
@@ -191,6 +194,9 @@ public class ExecutionContext<Row> implements DataContext {
/** {@inheritDoc} */
@Override public Object get(String name) {
+ if (Variable.CANCEL_FLAG.camelName.equals(name))
+ return cancelFlag;
+
return params.get(name);
}
@@ -226,4 +232,17 @@ public class ExecutionContext<Row> implements DataContext {
public void execute(Runnable task) {
executor.execute(qryId, fragmentId(), task);
}
+
+ /**
+ * Sets cancel flag, returns {@code true} if flag was changed by this call.
+ *
+ * @return {@code True} if flag was changed by this call.
+ */
+ public boolean cancel() {
+ return !cancelFlag.get() && cancelFlag.compareAndSet(false, true);
+ }
+
+ public boolean isCancelled() {
+ return cancelFlag.get();
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 59d6d49..ba9e1c2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -959,7 +959,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan,
Node<Row> root) {
this.ctx = ctx;
- RootNode<Row> rootNode = new RootNode<>(ctx, this::tryClose);
+ RootNode<Row> rootNode = new RootNode<>(ctx, root.rowType(),
this::tryClose);
rootNode.register(root);
this.root = rootNode;
@@ -1003,18 +1003,21 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
if (state == QueryState.RUNNING)
state0 = state = QueryState.CLOSING;
+ // 1) Cancel local fragment
+ ctx.cancel();
+
+ // 2) close local fragment
+ root.closeInternal();
+
if (state == QueryState.CLOSING && waiting.isEmpty())
state0 = state = QueryState.CLOSED;
}
if (state0 == QueryState.CLOSED) {
- // 1) unregister runing query
+ // 3) unregister runing query
running.remove(ctx.queryId());
- // 2) close local fragment
- root.closeInternal();
-
- // 3) close remote fragments
+ // 4) close remote fragments
for (UUID nodeId : remotes) {
try {
exchangeService().closeOutbox(nodeId, ctx.queryId(),
-1, -1);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 0b2a62c..75d9479 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -129,7 +129,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
// Outbox fragment ID is used as exchange ID as well.
Outbox<Row> outbox =
- new Outbox<>(ctx, exchangeSvc, mailboxRegistry, rel.exchangeId(),
rel.targetFragmentId(), dest);
+ new Outbox<>(ctx, rel.getRowType(), exchangeSvc, mailboxRegistry,
rel.exchangeId(), rel.targetFragmentId(), dest);
Node<Row> input = visit(rel.getInput());
@@ -144,7 +144,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
@Override public Node<Row> visit(IgniteFilter rel) {
Predicate<Row> pred = expressionFactory.predicate(rel.getCondition(),
rel.getRowType());
- FilterNode<Row> node = new FilterNode<>(ctx, pred);
+ FilterNode<Row> node = new FilterNode<>(ctx, rel.getRowType(), pred);
Node<Row> input = visit(rel.getInput());
@@ -155,7 +155,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteTrimExchange rel) {
- FilterNode<Row> node = new FilterNode<>(ctx,
partitionFilter(rel.distribution()));
+ FilterNode<Row> node = new FilterNode<>(ctx, rel.getRowType(),
partitionFilter(rel.distribution()));
Node<Row> input = visit(rel.getInput());
@@ -168,7 +168,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
@Override public Node<Row> visit(IgniteProject rel) {
Function<Row, Row> prj = expressionFactory.project(rel.getProjects(),
rel.getInput().getRowType());
- ProjectNode<Row> node = new ProjectNode<>(ctx, prj);
+ ProjectNode<Row> node = new ProjectNode<>(ctx, rel.getRowType(), prj);
Node<Row> input = visit(rel.getInput());
@@ -179,6 +179,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteNestedLoopJoin rel) {
+ RelDataType outType = rel.getRowType();
RelDataType leftType = rel.getLeft().getRowType();
RelDataType rightType = rel.getRight().getRowType();
JoinRelType joinType = rel.getJoinType();
@@ -186,7 +187,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
RelDataType rowType = combinedRowType(ctx.getTypeFactory(), leftType,
rightType);
Predicate<Row> cond = expressionFactory.predicate(rel.getCondition(),
rowType);
- Node<Row> node = NestedLoopJoinNode.create(ctx, leftType, rightType,
joinType, cond);
+ Node<Row> node = NestedLoopJoinNode.create(ctx, outType, leftType,
rightType, joinType, cond);
Node<Row> leftInput = visit(rel.getLeft());
Node<Row> rightInput = visit(rel.getRight());
@@ -198,13 +199,16 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteCorrelatedNestedLoopJoin rel) {
- RelDataType rowType = combinedRowType(ctx.getTypeFactory(),
rel.getLeft().getRowType(), rel.getRight().getRowType());
+ RelDataType outType = rel.getRowType();
+ RelDataType leftType = rel.getLeft().getRowType();
+ RelDataType rightType = rel.getRight().getRowType();
+ RelDataType rowType = combinedRowType(ctx.getTypeFactory(), leftType,
rightType);
Predicate<Row> cond = expressionFactory.predicate(rel.getCondition(),
rowType);
assert rel.getJoinType() == JoinRelType.INNER; // TODO LEFT, SEMI, ANTI
- Node<Row> node = new CorrelatedNestedLoopJoinNode<>(ctx, cond,
rel.getVariablesSet());
+ Node<Row> node = new CorrelatedNestedLoopJoinNode<>(ctx, outType,
cond, rel.getVariablesSet());
Node<Row> leftInput = visit(rel.getLeft());
Node<Row> rightInput = visit(rel.getRight());
@@ -226,17 +230,17 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
List<RexNode> lowerCond = rel.lowerBound();
List<RexNode> upperCond = rel.upperBound();
- RelDataType cols = tbl.getRowType(typeFactory, requiredColunms);
+ RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
- Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, cols);
+ Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, rowType);
Supplier<Row> lower = lowerCond == null ? null :
expressionFactory.rowSource(lowerCond);
Supplier<Row> upper = upperCond == null ? null :
expressionFactory.rowSource(upperCond);
- Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, cols);
+ Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, rowType);
IgniteIndex idx = tbl.getIndex(rel.indexName());
Iterable<Row> rowsIter = idx.scan(ctx, filters, lower, upper, prj,
requiredColunms);
- return new ScanNode<>(ctx, rowsIter);
+ return new ScanNode<>(ctx, rowType, rowsIter);
}
/** {@inheritDoc} */
@@ -248,26 +252,28 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
- RelDataType cols = tbl.getRowType(typeFactory, requiredColunms);
+ RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
- Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, cols);
- Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, cols);
+ Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, rowType);
+ Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, rowType);
Iterable<Row> rowsIter = tbl.scan(ctx, filters, prj, requiredColunms);
- return new ScanNode<>(ctx, rowsIter);
+ return new ScanNode<>(ctx, rowType, rowsIter);
}
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteValues rel) {
List<RexLiteral> vals = Commons.flat(Commons.cast(rel.getTuples()));
- return new ScanNode<>(ctx, expressionFactory.values(vals,
rel.getRowType()));
+ RelDataType rowType = rel.getRowType();
+
+ return new ScanNode<>(ctx, rowType, expressionFactory.values(vals,
rowType));
}
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteUnionAll rel) {
- UnionAllNode<Row> node = new UnionAllNode<>(ctx);
+ UnionAllNode<Row> node = new UnionAllNode<>(ctx, rel.getRowType());
List<Node<Row>> inputs = Commons.transform(rel.getInputs(),
this::visit);
@@ -280,7 +286,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
@Override public Node<Row> visit(IgniteSort rel) {
RelCollation collation = rel.getCollation();
- SortNode<Row> node = new SortNode<>(ctx,
expressionFactory.comparator(collation));
+ SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(),
expressionFactory.comparator(collation));
Node<Row> input = visit(rel.getInput());
@@ -295,7 +301,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
case INSERT:
case UPDATE:
case DELETE:
- ModifyNode<Row> node = new ModifyNode<>(ctx,
rel.getTable().unwrap(TableDescriptor.class),
+ ModifyNode<Row> node = new ModifyNode<>(ctx, rel.getRowType(),
rel.getTable().unwrap(TableDescriptor.class),
rel.getOperation(), rel.getUpdateColumnList());
Node<Row> input = visit(rel.getInput());
@@ -317,7 +323,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
// here may be an already created (to consume rows from remote nodes)
inbox
// without proper context, we need to init it with a right one.
- inbox.init(ctx, ctx.remoteSources(rel.exchangeId()),
expressionFactory.comparator(rel.collation()));
+ inbox.init(ctx, rel.getRowType(), ctx.remoteSources(rel.exchangeId()),
expressionFactory.comparator(rel.collation()));
return inbox;
}
@@ -326,11 +332,14 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
@Override public Node<Row> visit(IgniteAggregate rel) {
AggregateNode.AggregateType type = AggregateNode.AggregateType.SINGLE;
+ RelDataType rowType = rel.getRowType();
+ RelDataType inputType = rel.getInput().getRowType();
+
Supplier<List<AccumulatorWrapper<Row>>> accFactory =
expressionFactory.accumulatorsFactory(
- type, rel.getAggCallList(), rel.getInput().getRowType());
- RowFactory<Row> rowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType());
+ type, rel.getAggCallList(), inputType);
+ RowFactory<Row> rowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
- AggregateNode<Row> node = new AggregateNode<>(ctx, type,
rel.getGroupSets(), accFactory, rowFactory);
+ AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type,
rel.getGroupSets(), accFactory, rowFactory);
Node<Row> input = visit(rel.getInput());
@@ -343,11 +352,14 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
@Override public Node<Row> visit(IgniteMapAggregate rel) {
AggregateNode.AggregateType type = AggregateNode.AggregateType.MAP;
+ RelDataType rowType = rel.getRowType();
+ RelDataType inputType = rel.getInput().getRowType();
+
Supplier<List<AccumulatorWrapper<Row>>> accFactory =
expressionFactory.accumulatorsFactory(
- type, rel.getAggCallList(), rel.getInput().getRowType());
- RowFactory<Row> rowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType());
+ type, rel.getAggCallList(), inputType);
+ RowFactory<Row> rowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
- AggregateNode<Row> node = new AggregateNode<>(ctx, type,
rel.getGroupSets(), accFactory, rowFactory);
+ AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type,
rel.getGroupSets(), accFactory, rowFactory);
Node<Row> input = visit(rel.getInput());
@@ -360,11 +372,13 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
@Override public Node<Row> visit(IgniteReduceAggregate rel) {
AggregateNode.AggregateType type = AggregateNode.AggregateType.REDUCE;
+ RelDataType rowType = rel.getRowType();
+
Supplier<List<AccumulatorWrapper<Row>>> accFactory =
expressionFactory.accumulatorsFactory(
type, rel.aggregateCalls(), null);
- RowFactory<Row> rowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType());
+ RowFactory<Row> rowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
- AggregateNode<Row> node = new AggregateNode<>(ctx, type,
rel.groupSets(), accFactory, rowFactory);
+ AggregateNode<Row> node = new AggregateNode<>(ctx, rowType, type,
rel.groupSets(), accFactory, rowFactory);
Node<Row> input = visit(rel.getInput());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index f021516..ab410a6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -20,7 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
-
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -53,7 +53,10 @@ public abstract class AbstractNode<Row> implements Node<Row>
{
* creates on first message received from a remote source. This case the
context
* sets in scope of {@link Inbox#init(ExecutionContext, Collection,
Comparator)} method call.
*/
- private volatile ExecutionContext<Row> ctx;
+ private ExecutionContext<Row> ctx;
+
+ /** */
+ private RelDataType rowType;
/** */
private Downstream<Row> downstream;
@@ -67,8 +70,9 @@ public abstract class AbstractNode<Row> implements Node<Row> {
/**
* @param ctx Execution context.
*/
- protected AbstractNode(ExecutionContext<Row> ctx) {
+ protected AbstractNode(ExecutionContext<Row> ctx, RelDataType rowType) {
this.ctx = ctx;
+ this.rowType = rowType;
}
/**
@@ -86,6 +90,16 @@ public abstract class AbstractNode<Row> implements Node<Row>
{
}
/** {@inheritDoc} */
+ @Override public RelDataType rowType() {
+ return rowType;
+ }
+
+ /** */
+ protected void rowType(RelDataType rowType) {
+ this.rowType = rowType;
+ }
+
+ /** {@inheritDoc} */
@Override public void register(List<Node<Row>> sources) {
this.sources = sources;
@@ -165,7 +179,7 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
/** */
protected void checkState() throws IgniteCheckedException {
- if (isClosed())
+ if (context().isCancelled())
throw new ExecutionCancelledException();
if (Thread.interrupted())
throw new IgniteInterruptedCheckedException("Thread was
interrupted.");
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
index 80911e0..7e242dd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
-
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -72,9 +72,9 @@ public class AggregateNode<Row> extends AbstractNode<Row>
implements SingleNode<
/**
* @param ctx Execution context.
*/
- public AggregateNode(ExecutionContext<Row> ctx, AggregateType type,
List<ImmutableBitSet> grpSets,
+ public AggregateNode(ExecutionContext<Row> ctx, RelDataType rowType,
AggregateType type, List<ImmutableBitSet> grpSets,
Supplier<List<AccumulatorWrapper<Row>>> accFactory, RowFactory<Row>
rowFactory) {
- super(ctx);
+ super(ctx, rowType);
this.type = type;
this.accFactory = accFactory;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
index d102be1..ca0d7a4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -22,8 +22,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
-
import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
@@ -81,8 +81,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- public CorrelatedNestedLoopJoinNode(ExecutionContext<Row> ctx,
Predicate<Row> cond, Set<CorrelationId> correlationIds) {
- super(ctx);
+ public CorrelatedNestedLoopJoinNode(ExecutionContext<Row> ctx, RelDataType
rowType, Predicate<Row> cond, Set<CorrelationId> correlationIds) {
+ super(ctx, rowType);
assert !F.isEmpty(correlationIds);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
index 06703eb..cbc0922 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
@@ -20,7 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.function.Predicate;
-
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
@@ -48,8 +48,8 @@ public class FilterNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
* @param ctx Execution context.
* @param pred Predicate.
*/
- public FilterNode(ExecutionContext<Row> ctx, Predicate<Row> pred) {
- super(ctx);
+ public FilterNode(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> pred) {
+ super(ctx, rowType);
this.pred = pred;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index 79e8e71..9c0c918 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.Pair;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -84,7 +85,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements
Mailbox<Row>, Singl
long exchangeId,
long srcFragmentId
) {
- super(ctx);
+ super(ctx, ctx.getTypeFactory().createUnknownType());
this.exchange = exchange;
this.registry = registry;
@@ -106,11 +107,13 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
* @param srcNodeIds Source node IDs.
* @param comp Optional comparator for merge exchange.
*/
- public void init(ExecutionContext<Row> ctx, Collection<UUID> srcNodeIds,
@Nullable Comparator<Row> comp) {
+ public void init(ExecutionContext<Row> ctx, RelDataType rowType,
Collection<UUID> srcNodeIds, @Nullable Comparator<Row> comp) {
// It's important to set proper context here because
- // because the one, that is created on a first message
+ // the one, that is created on a first message
// received doesn't have all context variables in place.
context(ctx);
+ rowType(rowType);
+
this.comp = comp;
// memory barier
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index a89255f..5d253c3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -21,13 +21,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
-
import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -77,11 +76,12 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
*/
public ModifyNode(
ExecutionContext<Row> ctx,
+ RelDataType rowType,
TableDescriptor desc,
TableModify.Operation op,
List<String> cols
) {
- super(ctx);
+ super(ctx, rowType);
this.desc = desc;
this.op = op;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index 9a90d36..b634ac2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -23,7 +23,6 @@ import java.util.BitSet;
import java.util.Deque;
import java.util.List;
import java.util.function.Predicate;
-
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
@@ -65,8 +64,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- private NestedLoopJoinNode(ExecutionContext<Row> ctx, Predicate<Row> cond)
{
- super(ctx);
+ private NestedLoopJoinNode(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> cond) {
+ super(ctx, rowType);
this.cond = cond;
handler = ctx.rowHandler();
@@ -239,36 +238,36 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
protected abstract void join() throws IgniteCheckedException;
/** */
- @NotNull public static <Row> NestedLoopJoinNode<Row>
create(ExecutionContext<Row> ctx, RelDataType leftRowType,
+ @NotNull public static <Row> NestedLoopJoinNode<Row>
create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType
leftRowType,
RelDataType rightRowType, JoinRelType joinType, Predicate<Row> cond) {
switch (joinType) {
case INNER:
- return new InnerJoin<>(ctx, cond);
+ return new InnerJoin<>(ctx, outputRowType, cond);
case LEFT: {
RowHandler.RowFactory<Row> rightRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
- return new LeftJoin<>(ctx, cond, rightRowFactory);
+ return new LeftJoin<>(ctx, outputRowType, cond,
rightRowFactory);
}
case RIGHT: {
RowHandler.RowFactory<Row> leftRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
- return new RightJoin<>(ctx, cond, leftRowFactory);
+ return new RightJoin<>(ctx, outputRowType, cond,
leftRowFactory);
}
case FULL: {
RowHandler.RowFactory<Row> leftRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
RowHandler.RowFactory<Row> rightRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
- return new FullOuterJoin<>(ctx, cond, leftRowFactory,
rightRowFactory);
+ return new FullOuterJoin<>(ctx, outputRowType, cond,
leftRowFactory, rightRowFactory);
}
case SEMI:
- return new SemiJoin<>(ctx, cond);
+ return new SemiJoin<>(ctx, outputRowType, cond);
case ANTI:
- return new AntiJoin<>(ctx, cond);
+ return new AntiJoin<>(ctx, outputRowType, cond);
default:
throw new IllegalStateException("Join type \"" + joinType +
"\" is not supported yet");
@@ -287,8 +286,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- public InnerJoin(ExecutionContext<Row> ctx, Predicate<Row> cond) {
- super(ctx, cond);
+ public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> cond) {
+ super(ctx, rowType, cond);
}
/** {@inheritDoc} */
@@ -362,8 +361,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- public LeftJoin(ExecutionContext<Row> ctx, Predicate<Row> cond,
RowHandler.RowFactory<Row> rightRowFactory) {
- super(ctx, cond);
+ public LeftJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> cond, RowHandler.RowFactory<Row> rightRowFactory) {
+ super(ctx, rowType, cond);
this.rightRowFactory = rightRowFactory;
}
@@ -458,8 +457,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- public RightJoin(ExecutionContext<Row> ctx, Predicate<Row> cond,
RowHandler.RowFactory<Row> leftRowFactory) {
- super(ctx, cond);
+ public RightJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> cond, RowHandler.RowFactory<Row> leftRowFactory) {
+ super(ctx, rowType, cond);
this.leftRowFactory = leftRowFactory;
}
@@ -584,9 +583,9 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- public FullOuterJoin(ExecutionContext<Row> ctx, Predicate<Row> cond,
RowHandler.RowFactory<Row> leftRowFactory,
+ public FullOuterJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> cond, RowHandler.RowFactory<Row> leftRowFactory,
RowHandler.RowFactory<Row> rightRowFactory) {
- super(ctx, cond);
+ super(ctx, rowType, cond);
this.leftRowFactory = leftRowFactory;
this.rightRowFactory = rightRowFactory;
@@ -713,8 +712,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- public SemiJoin(ExecutionContext<Row> ctx, Predicate<Row> cond) {
- super(ctx, cond);
+ public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> cond) {
+ super(ctx, rowType, cond);
}
/** {@inheritDoc} */
@@ -781,8 +780,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param cond Join expression.
*/
- public AntiJoin(ExecutionContext<Row> ctx, Predicate<Row> cond) {
- super(ctx, cond);
+ public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Predicate<Row> cond) {
+ super(ctx, rowType, cond);
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index d0d8e51..028baa6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.List;
-
+import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
/**
@@ -36,6 +36,13 @@ public interface Node<Row> extends AutoCloseable {
ExecutionContext<Row> context();
/**
+ * Returns logical node output row type.
+ *
+ * @return Logical node output row type.
+ */
+ RelDataType rowType();
+
+ /**
* @return Node downstream.
*/
Downstream<Row> downstream();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index c5105f3..3c44646 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -73,13 +74,14 @@ public class Outbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Sing
*/
public Outbox(
ExecutionContext<Row> ctx,
+ RelDataType rowType,
ExchangeService exchange,
MailboxRegistry registry,
long exchangeId,
long targetFragmentId,
Destination<Row> dest
) {
- super(ctx);
+ super(ctx, rowType);
this.exchange = exchange;
this.registry = registry;
this.targetFragmentId = targetFragmentId;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
index dec2710..4da1284 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.function.Function;
-
+import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
@@ -33,8 +33,8 @@ public class ProjectNode<Row> extends AbstractNode<Row>
implements SingleNode<Ro
* @param ctx Execution context.
* @param prj Projection.
*/
- public ProjectNode(ExecutionContext<Row> ctx, Function<Row, Row> prj) {
- super(ctx);
+ public ProjectNode(ExecutionContext<Row> ctx, RelDataType rowType,
Function<Row, Row> prj) {
+ super(ctx, rowType);
this.prj = prj;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 75a6962..b7cf76c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -65,8 +66,8 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
/**
* @param ctx Execution context.
*/
- public RootNode(ExecutionContext<Row> ctx) {
- super(ctx);
+ public RootNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+ super(ctx, rowType);
onClose = this::closeInternal;
}
@@ -74,8 +75,8 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
/**
* @param ctx Execution context.
*/
- public RootNode(ExecutionContext<Row> ctx, Runnable onClose) {
- super(ctx);
+ public RootNode(ExecutionContext<Row> ctx, RelDataType rowType, Runnable
onClose) {
+ super(ctx, rowType);
this.onClose = onClose;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 7576c02..739c773 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -19,7 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Iterator;
import java.util.List;
-
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -44,8 +44,8 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
* @param ctx Execution context.
* @param src Source.
*/
- public ScanNode(ExecutionContext<Row> ctx, Iterable<Row> src) {
- super(ctx);
+ public ScanNode(ExecutionContext<Row> ctx, RelDataType rowType,
Iterable<Row> src) {
+ super(ctx, rowType);
this.src = src;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 3ab1148..782b012 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -18,7 +18,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Comparator;
import java.util.PriorityQueue;
-
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
@@ -43,8 +43,8 @@ public class SortNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
* @param ctx Execution context.
* @param comp Rows comparator.
*/
- public SortNode(ExecutionContext<Row> ctx, Comparator<Row> comp) {
- super(ctx);
+ public SortNode(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp) {
+ super(ctx, rowType);
rows = comp == null ? new PriorityQueue<>() : new
PriorityQueue<>(comp);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
index 0d1a6d3..385577b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
@@ -33,8 +34,8 @@ public class UnionAllNode<Row> extends AbstractNode<Row>
implements Downstream<R
/**
* @param ctx Execution context.
*/
- public UnionAllNode(ExecutionContext<Row> ctx) {
- super(ctx);
+ public UnionAllNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+ super(ctx, rowType);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
index ea394af..78017c8 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
@@ -92,7 +92,7 @@ public abstract class AbstractIndexScan extends
ProjectableFilterableTableScan {
pw = super.explainTerms0(pw);
return pw
.itemIf("lower", lowerBound, !F.isEmpty(lowerBound()))
- .itemIf("upper", upperBound, !F.isEmpty(lowerBound()));
+ .itemIf("upper", upperBound, !F.isEmpty(upperBound()));
}
/**
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index f05a2e6..4188276 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -1341,7 +1341,7 @@ public class PlannerTest extends GridCommonAbstractTest {
exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0,
mailboxRegistry, exchangeSvc,
new TestFailureProcessor(kernal)).go(fragment.root());
- RootNode<Object[]> consumer = new RootNode<>(ectx);
+ RootNode<Object[]> consumer = new RootNode<>(ectx, exec.rowType());
consumer.register(exec);
//// Remote part
@@ -1600,7 +1600,7 @@ public class PlannerTest extends GridCommonAbstractTest {
exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0,
mailboxRegistry, exchangeSvc,
new TestFailureProcessor(kernal)).go(fragment.root());
- RootNode<Object[]> consumer = new RootNode<>(ectx);
+ RootNode<Object[]> consumer = new RootNode<>(ectx,exec.rowType());
consumer.register(exec);
//// Remote part
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
index ae07e61..e3724f2 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
@@ -22,11 +22,13 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.UUID;
-
import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import org.apache.ignite.internal.processors.query.calcite.trait.AllNodes;
+import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -111,18 +113,21 @@ public class ContinuousExecutionTest extends
AbstractExecutionTest {
};
ExecutionContext<Object[]> ectx = executionContext(locNodeId,
qryId, 0);
+ IgniteTypeFactory tf = ectx.getTypeFactory();
- ScanNode<Object[]> scan = new ScanNode<>(ectx, iterable);
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
int.class, int.class, int.class, int.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ectx, rowType, iterable);
- ProjectNode<Object[]> project = new ProjectNode<>(ectx, r -> new
Object[]{r[0], r[1], r[5]});
+ rowType = TypeUtils.createRowType(tf, int.class, int.class,
int.class);
+ ProjectNode<Object[]> project = new ProjectNode<>(ectx, rowType, r
-> new Object[]{r[0], r[1], r[5]});
project.register(scan);
- FilterNode<Object[]> filter = new FilterNode<>(ectx, r ->
(Integer) r[0] >= 2);
+ FilterNode<Object[]> filter = new FilterNode<>(ectx, rowType, r ->
(Integer) r[0] >= 2);
filter.register(project);
MailboxRegistry registry = mailboxRegistry(locNodeId);
- Outbox<Object[]> outbox = new Outbox<>(ectx,
exchangeService(locNodeId), registry,
+ Outbox<Object[]> outbox = new Outbox<>(ectx, rowType,
exchangeService(locNodeId), registry,
0, 1, new AllNodes(nodes.subList(0, 1)));
outbox.register(filter);
@@ -134,15 +139,17 @@ public class ContinuousExecutionTest extends
AbstractExecutionTest {
UUID locNodeId = nodes.get(0);
ExecutionContext<Object[]> ectx = executionContext(locNodeId, qryId,
1);
+ IgniteTypeFactory tf = ectx.getTypeFactory();
MailboxRegistry registry = mailboxRegistry(locNodeId);
Inbox<Object[]> inbox = (Inbox<Object[]>) registry.register(
new Inbox<>(ectx, exchangeService(locNodeId), registry, 0, 0));
- inbox.init(ectx, nodes.subList(1, nodes.size()), null);
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
int.class, int.class);
+ inbox.init(ectx, rowType, nodes.subList(1, nodes.size()), null);
- RootNode<Object[]> node = new RootNode<>(ectx);
+ RootNode<Object[]> node = new RootNode<>(ectx, rowType);
node.register(inbox);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 8e8b900..bbbee53 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -22,21 +22,19 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -80,34 +78,39 @@ public class ExecutionTest extends AbstractExecutionTest {
// WHERE P.ID >= 2
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
String.class, String.class);
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+ ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType,
Arrays.asList(
new Object[]{0, "Igor", "Seliverstov"},
new Object[]{1, "Roman", "Kondakov"},
new Object[]{2, "Ivan", "Pavlukhin"},
new Object[]{3, "Alexey", "Goncharuk"}
));
- ScanNode<Object[]> projects = new ScanNode<>(ctx, Arrays.asList(
+ rowType = TypeUtils.createRowType(tf, int.class, int.class,
String.class);
+ ScanNode<Object[]> projects = new ScanNode<>(ctx, rowType,
Arrays.asList(
new Object[]{0, 2, "Calcite"},
new Object[]{1, 1, "SQL"},
new Object[]{2, 2, "Ignite"},
new Object[]{3, 0, "Core"}
));
- RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, String.class);
- RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, int.class, String.class);
+ RelDataType outType = TypeUtils.createRowType(tf, int.class,
String.class, String.class, int.class, int.class, String.class);
+ RelDataType leftType = TypeUtils.createRowType(tf, int.class,
String.class, String.class);
+ RelDataType rightType = TypeUtils.createRowType(tf, int.class,
int.class, String.class);
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
leftType, rightType, INNER, r -> r[0] == r[4]);
+ NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
outType, leftType, rightType, INNER, r -> r[0] == r[4]);
join.register(F.asList(persons, projects));
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[0], r[1], r[5]});
+ rowType = TypeUtils.createRowType(tf, int.class, String.class,
String.class);
+ ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r ->
new Object[]{r[0], r[1], r[5]});
project.register(join);
- FilterNode<Object[]> filter = new FilterNode<>(ctx, r -> (Integer)
r[0] >= 2);
+ FilterNode<Object[]> filter = new FilterNode<>(ctx, rowType, r ->
(Integer) r[0] >= 2);
filter.register(project);
- RootNode<Object[]> node = new RootNode<>(ctx);
+ RootNode<Object[]> node = new RootNode<>(ctx, rowType);
node.register(filter);
assert node.hasNext();
@@ -127,31 +130,34 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testUnionAll() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
- ScanNode<Object[]> scan1 = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+
+ ScanNode<Object[]> scan1 = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- ScanNode<Object[]> scan2 = new ScanNode<>(ctx, Arrays.asList(
+ ScanNode<Object[]> scan2 = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- ScanNode<Object[]> scan3 = new ScanNode<>(ctx, Arrays.asList(
+ ScanNode<Object[]> scan3 = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- UnionAllNode<Object[]> union = new UnionAllNode<>(ctx);
+ UnionAllNode<Object[]> union = new UnionAllNode<>(ctx, rowType);
union.register(F.asList(scan1, scan2, scan3));
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, rowType);
root.register(union);
assertTrue(root.hasNext());
@@ -174,28 +180,34 @@ public class ExecutionTest extends AbstractExecutionTest {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
String.class, Integer.class);
+
+ ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType,
Arrays.asList(
new Object[]{0, "Igor", 1},
new Object[]{1, "Roman", 2},
new Object[]{2, "Ivan", null},
new Object[]{3, "Alexey", 1}
));
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+ rowType = TypeUtils.createRowType(tf, int.class, String.class);
+ ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
new Object[]{1, "Core"},
new Object[]{2, "SQL"}
));
+ RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class, int.class, String.class);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class);
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
leftType, rightType, LEFT, r -> r[2] == r[3]);
+ NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
outType, leftType, rightType, LEFT, r -> r[2] == r[3]);
join.register(F.asList(persons, deps));
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[0], r[1], r[4]});
+ rowType = TypeUtils.createRowType(tf, int.class, String.class,
String.class);
+ ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r ->
new Object[]{r[0], r[1], r[4]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx);
+ RootNode<Object[]> node = new RootNode<>(ctx, rowType);
node.register(project);
assert node.hasNext();
@@ -222,30 +234,35 @@ public class ExecutionTest extends AbstractExecutionTest {
// on e.depno = d.depno
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
String.class, Integer.class);
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+ ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType,
Arrays.asList(
new Object[]{0, "Igor", 1},
new Object[]{1, "Roman", 2},
new Object[]{2, "Ivan", null},
new Object[]{3, "Alexey", 1}
));
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+ rowType = TypeUtils.createRowType(tf, int.class, String.class);
+ ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
new Object[]{1, "Core"},
new Object[]{2, "SQL"},
new Object[]{3, "QA"}
));
+ RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, int.class, String.class, Integer.class);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class);
RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
leftType, rightType, RIGHT, r -> r[0] == r[4]);
+ NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
outType, leftType, rightType, RIGHT, r -> r[0] == r[4]);
join.register(F.asList(deps, persons));
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[2], r[3], r[1]});
+ rowType = TypeUtils.createRowType(tf, int.class, String.class,
String.class);
+ ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r ->
new Object[]{r[2], r[3], r[1]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx);
+ RootNode<Object[]> node = new RootNode<>(ctx, rowType);
node.register(project);
assert node.hasNext();
@@ -272,30 +289,35 @@ public class ExecutionTest extends AbstractExecutionTest {
// on e.depno = d.depno
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
String.class, Integer.class);
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+ ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType,
Arrays.asList(
new Object[]{0, "Igor", 1},
new Object[]{1, "Roman", 2},
new Object[]{2, "Ivan", null},
new Object[]{3, "Alexey", 1}
));
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+ rowType = TypeUtils.createRowType(tf, int.class, String.class);
+ ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
new Object[]{1, "Core"},
new Object[]{2, "SQL"},
new Object[]{3, "QA"}
));
+ RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class, int.class, String.class);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class);
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
leftType, rightType, FULL, r -> r[2] == r[3]);
+ NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
outType, leftType, rightType, FULL, r -> r[2] == r[3]);
join.register(F.asList(persons, deps));
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[0], r[1], r[4]});
+ rowType = TypeUtils.createRowType(tf, Integer.class, String.class,
String.class);
+ ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r ->
new Object[]{r[0], r[1], r[4]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx);
+ RootNode<Object[]> node = new RootNode<>(ctx, rowType);
node.register(project);
assert node.hasNext();
@@ -323,30 +345,35 @@ public class ExecutionTest extends AbstractExecutionTest {
// on e.depno = d.depno
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
String.class, Integer.class);
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+ ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType,
Arrays.asList(
new Object[]{0, "Igor", 1},
new Object[]{1, "Roman", 2},
new Object[]{2, "Ivan", null},
new Object[]{3, "Alexey", 1}
));
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+ rowType = TypeUtils.createRowType(tf, int.class, String.class);
+ ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
new Object[]{1, "Core"},
new Object[]{2, "SQL"},
new Object[]{3, "QA"}
));
+ RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class);
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
leftType, rightType, SEMI, r -> r[0] == r[4]);
+ NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
outType, leftType, rightType, SEMI, r -> r[0] == r[4]);
join.register(F.asList(deps, persons));
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[1]});
+ rowType = TypeUtils.createRowType(tf, String.class);
+ ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r ->
new Object[]{r[1]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx);
+ RootNode<Object[]> node = new RootNode<>(ctx, rowType);
node.register(project);
assert node.hasNext();
@@ -371,30 +398,35 @@ public class ExecutionTest extends AbstractExecutionTest {
// on e.depno = d.depno
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
String.class, Integer.class);
- ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+ ScanNode<Object[]> persons = new ScanNode<>(ctx, rowType,
Arrays.asList(
new Object[]{0, "Igor", 1},
new Object[]{1, "Roman", 2},
new Object[]{2, "Ivan", null},
new Object[]{3, "Alexey", 1}
));
- ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+ rowType = TypeUtils.createRowType(tf, int.class, String.class);
+ ScanNode<Object[]> deps = new ScanNode<>(ctx, rowType, Arrays.asList(
new Object[]{1, "Core"},
new Object[]{2, "SQL"},
new Object[]{3, "QA"}
));
+ RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class);
- NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
leftType, rightType, ANTI, r -> r[0] == r[4]);
+ NestedLoopJoinNode<Object[]> join = NestedLoopJoinNode.create(ctx,
outType, leftType, rightType, ANTI, r -> r[0] == r[4]);
join.register(F.asList(deps, persons));
- ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new
Object[]{r[1]});
+ rowType = TypeUtils.createRowType(tf, String.class);
+ ProjectNode<Object[]> project = new ProjectNode<>(ctx, rowType, r ->
new Object[]{r[1]});
project.register(join);
- RootNode<Object[]> node = new RootNode<>(ctx);
+ RootNode<Object[]> node = new RootNode<>(ctx, rowType);
node.register(project);
assert node.hasNext();
@@ -413,22 +445,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateMapReduceAvg() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.AVG,
false,
@@ -437,16 +462,20 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.DOUBLE),
+ tf.createJavaType(double.class),
null);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets,
accFactory(ctx, call, MAP, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapAggregate.rowType(tf);
+ AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP,
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
map.register(scan);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+ RelDataType reduceType = TypeUtils.createRowType(tf, double.class);
+ AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType,
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
root.register(reduce);
assertTrue(root.hasNext());
@@ -458,23 +487,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateMapReduceSum() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- // empty groups means SELECT SUM(field) FROM table
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
- // AVG on second field
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.SUM,
false,
@@ -483,16 +504,20 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets,
accFactory(ctx, call, MAP, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapAggregate.rowType(tf);
+ AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP,
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
map.register(scan);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType,
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
root.register(reduce);
assertTrue(root.hasNext());
@@ -504,22 +529,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateMapReduceMin() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.MIN,
false,
@@ -528,16 +546,20 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets,
accFactory(ctx, call, MAP, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapAggregate.rowType(tf);
+ AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP,
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
map.register(scan);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType,
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
root.register(reduce);
assertTrue(root.hasNext());
@@ -549,22 +571,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateMapReduceMax() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.MAX,
false,
@@ -573,16 +588,20 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets,
accFactory(ctx, call, MAP, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapAggregate.rowType(tf);
+ AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP,
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
map.register(scan);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType,
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
root.register(reduce);
assertTrue(root.hasNext());
@@ -594,22 +613,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateMapReduceCount() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.COUNT,
false,
@@ -618,16 +630,20 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> map = new AggregateNode<>(ctx, MAP, grpSets,
accFactory(ctx, call, MAP, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType mapType = IgniteMapAggregate.rowType(tf);
+ AggregateNode<Object[]> map = new AggregateNode<>(ctx, mapType, MAP,
grpSets, accFactory(ctx, call, MAP, rowType), rowFactory());
map.register(scan);
- AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE,
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
+ RelDataType reduceType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, reduceType,
REDUCE, grpSets, accFactory(ctx, call, REDUCE, null), rowFactory());
reduce.register(map);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, reduceType);
root.register(reduce);
assertTrue(root.hasNext());
@@ -639,22 +655,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateAvg() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.AVG,
false,
@@ -663,13 +672,16 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.DOUBLE),
+ tf.createJavaType(double.class),
null);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType,
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
root.register(agg);
assertTrue(root.hasNext());
@@ -681,24 +693,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateSum() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- // empty groups means SELECT SUM(field) FROM table
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
- // AVG on second field
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.SUM,
false,
@@ -707,13 +710,16 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType,
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
root.register(agg);
assertTrue(root.hasNext());
@@ -725,22 +731,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateMin() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.MIN,
false,
@@ -749,13 +748,16 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType,
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
root.register(agg);
assertTrue(root.hasNext());
@@ -767,22 +769,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateMax() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.MAX,
false,
@@ -791,15 +786,19 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(1),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType,
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
root.register(agg);
+
assertTrue(root.hasNext());
assertEquals(1400, root.next()[0]);
assertFalse(root.hasNext());
@@ -809,22 +808,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateCount() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 200),
row("Roman", 300),
row("Ivan", 1400),
row("Alexey", 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.COUNT,
false,
@@ -833,13 +825,16 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of());
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType,
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
root.register(agg);
assertTrue(root.hasNext());
@@ -851,23 +846,15 @@ public class ExecutionTest extends AbstractExecutionTest {
@Test
public void testAggregateCountByGroup() {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
-
- ScanNode<Object[]> scan = new ScanNode<>(ctx, Arrays.asList(
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, String.class,
int.class, int.class);
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, Arrays.asList(
row("Igor", 0, 200),
row("Roman", 1, 300),
row("Ivan", 1, 1400),
row("Alexey", 0, 1000)
));
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
-
- RelDataType rowType = typeFactory.createStructType(F.asList(
- Pair.of("NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)),
- Pair.of("PROJECT_ID",
typeFactory.createSqlType(SqlTypeName.INTEGER)),
- Pair.of("SALARY",
typeFactory.createSqlType(SqlTypeName.INTEGER))));
-
- ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of(1));
-
AggregateCall call = AggregateCall.create(
SqlStdOperatorTable.COUNT,
false,
@@ -876,13 +863,16 @@ public class ExecutionTest extends AbstractExecutionTest {
ImmutableIntList.of(),
-1,
RelCollations.EMPTY,
- typeFactory.createSqlType(SqlTypeName.INTEGER),
+ tf.createJavaType(int.class),
null);
- AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE,
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
+ ImmutableList<ImmutableBitSet> grpSets =
ImmutableList.of(ImmutableBitSet.of(1));
+
+ RelDataType aggType = TypeUtils.createRowType(tf, int.class);
+ AggregateNode<Object[]> agg = new AggregateNode<>(ctx, aggType,
SINGLE, grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
agg.register(scan);
- RootNode<Object[]> root = new RootNode<>(ctx);
+ RootNode<Object[]> root = new RootNode<>(ctx, aggType);
root.register(agg);
assertTrue(root.hasNext());