This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3ca9149 [BEAM-7844] Implementing NodeStat Estimations for all the
nodes
new 7d56a23 Merge pull request #9198 from riazela/RowRateWindowEstimation
3ca9149 is described below
commit 3ca91490e0644ab89e5febc9d139402877f939e1
Author: Alireza Samadian <[email protected]>
AuthorDate: Thu Aug 1 09:02:11 2019 -0700
[BEAM-7844] Implementing NodeStat Estimations for all the nodes
---
.../sql/impl/planner/NodeStatsMetadata.java | 4 +-
.../sql/impl/rel/BeamAggregationRel.java | 39 ++++++++-
.../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 22 ++++-
.../sdk/extensions/sql/impl/rel/BeamIOSinkRel.java | 2 +-
.../extensions/sql/impl/rel/BeamIOSourceRel.java | 9 +-
.../extensions/sql/impl/rel/BeamIntersectRel.java | 14 ++-
.../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 26 ++++--
.../sdk/extensions/sql/impl/rel/BeamMinusRel.java | 8 +-
.../sdk/extensions/sql/impl/rel/BeamSortRel.java | 3 +-
.../extensions/sql/impl/rel/BeamSqlRelUtils.java | 19 +++++
.../extensions/sql/impl/rel/BeamUncollectRel.java | 5 +-
.../sdk/extensions/sql/impl/rel/BeamUnionRel.java | 10 ++-
.../sdk/extensions/sql/impl/rel/BeamUnnestRel.java | 5 +-
.../sdk/extensions/sql/impl/rel/BeamValuesRel.java | 2 +-
.../extensions/sql/impl/planner/NodeStatsTest.java | 15 ++++
...rceRelTest.java => BeamAggregationRelTest.java} | 70 ++++++++++-----
...amIOSourceRelTest.java => BeamCalcRelTest.java} | 77 ++++++++++++-----
.../sql/impl/rel/BeamIOSourceRelTest.java | 43 +++++++++-
.../sql/impl/rel/BeamIntersectRelTest.java | 27 ++++++
.../impl/rel/BeamJoinRelBoundedVsBoundedTest.java | 75 ++++++++++++++++
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 41 +++++++++
.../rel/BeamJoinRelUnboundedVsUnboundedTest.java | 50 ++++++++++-
.../extensions/sql/impl/rel/BeamMinusRelTest.java | 99 +++++++++++++++++++++-
.../extensions/sql/impl/rel/BeamSortRelTest.java | 39 +++++++--
.../sql/impl/rel/BeamUncollectRelTest.java | 47 ++++++++++
.../extensions/sql/impl/rel/BeamUnionRelTest.java | 53 ++++++++++++
.../extensions/sql/impl/rel/BeamValuesRelTest.java | 24 ++++++
27 files changed, 745 insertions(+), 83 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
index 4a9e79f..8bc62ee 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
@@ -35,8 +35,8 @@ public interface NodeStatsMetadata extends Metadata {
MetadataDef<NodeStatsMetadata> DEF =
MetadataDef.of(NodeStatsMetadata.class, NodeStatsMetadata.Handler.class,
METHOD);
- // In order to use this we need to call it by
relNode.metadata(RowRateWindowMetadata.class,
- // mq).getRowRateWindow() where mq is the MetadataQuery (can be obtained by
+ // In order to use this we need to call it by
relNode.metadata(NodeStatsMetadata.class,
+ // mq).getNodeStats() where mq is the MetadataQuery (can be obtained by
// relNode.getCluster().getMetadataQuery()). After this, Calcite looks for
the implementation of
// this metadata that we have registered in MetadataProvider (it is
RelMdNodeStats.class in
// this case and we have registered it in CalciteQueryPlanner). Then
Calcite's generated Code
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 14e7475..4e5978e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -84,7 +84,44 @@ public class BeamAggregationRel extends Aggregate implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+
+ NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(this.input, mq);
+
+ inputEstimate = computeWindowingCostEffect(inputEstimate);
+
+ NodeStats estimate;
+ // groupCount shows how many columns do we have in group by. One of them
might be the windowing.
+ int groupCount = groupSet.cardinality() - (windowFn == null ? 0 : 1);
+ // This is similar to what Calcite does.If groupCount is zero then then we
have only one value
+ // per window for unbounded and we have only one value for bounded. e.g
select count(*) from A
+ // If group count is none zero then more column we include in the group
by, more rows will be
+ // preserved.
+ return (groupCount == 0)
+ ? NodeStats.create(
+ Math.min(inputEstimate.getRowCount(), 1d),
+ inputEstimate.getRate() / inputEstimate.getWindow(),
+ 1d)
+ : inputEstimate.multiply(1.0 - Math.pow(.5, groupCount));
+ }
+
+ private NodeStats computeWindowingCostEffect(NodeStats inputStat) {
+ if (windowFn == null) {
+ return inputStat;
+ }
+ WindowFn w = windowFn;
+ double multiplicationFactor = 1;
+ // If the window is SlidingWindow, the number of tuples will increase.
(Because, some of the
+ // tuples repeat in multiple windows).
+ if (w instanceof SlidingWindows) {
+ multiplicationFactor =
+ ((double) ((SlidingWindows) w).getSize().getStandardSeconds())
+ / ((SlidingWindows) w).getPeriod().getStandardSeconds();
+ }
+
+ return NodeStats.create(
+ inputStat.getRowCount() * multiplicationFactor,
+ inputStat.getRate() * multiplicationFactor,
+ BeamIOSourceRel.CONSTANT_WINDOW_SIZE);
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index 31ae94a..755aaec 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -73,6 +73,8 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexSimplify;
import org.apache.calcite.rex.RexUtil;
@@ -216,7 +218,25 @@ public class BeamCalcRel extends Calc implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+ double selectivity = estimateFilterSelectivity(getInput(), program, mq);
+
+ return inputStat.multiply(selectivity);
+ }
+
+ private static double estimateFilterSelectivity(
+ RelNode child, RexProgram program, RelMetadataQuery mq) {
+ // Similar to calcite, if the calc node is representing filter operation
we estimate the filter
+ // selectivity based on the number of equality conditions, number of
inequality conditions, ....
+ RexLocalRef programCondition = program.getCondition();
+ RexNode condition;
+ if (programCondition == null) {
+ condition = null;
+ } else {
+ condition = program.expandLocalRef(programCondition);
+ }
+ // Currently this gets the selectivity based on Calcite's Selectivity
Handler (RelMdSelectivity)
+ return mq.getSelectivity(child, condition);
}
public boolean isInputSortRelAndLimitOnly() {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 5aaa635..5738e44 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -74,7 +74,7 @@ public class BeamIOSinkRel extends TableModify
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ return BeamSqlRelUtils.getNodeStats(this.input, mq);
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index d87f152..6305924 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -37,7 +37,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
/** BeamRelNode to replace a {@code TableScan} node. */
public class BeamIOSourceRel extends TableScan implements BeamRelNode {
-
+ public static final double CONSTANT_WINDOW_SIZE = 10d;
private final BeamSqlTable beamTable;
private final BeamCalciteTable calciteTable;
private final Map<String, String> pipelineOptions;
@@ -66,7 +66,12 @@ public class BeamIOSourceRel extends TableScan implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ BeamTableStatistics rowCountStatistics = calciteTable.getStatistic();
+ double window =
+ (beamTable.isBounded() == PCollection.IsBounded.BOUNDED)
+ ? rowCountStatistics.getRowCount()
+ : CONSTANT_WINDOW_SIZE;
+ return NodeStats.create(rowCountStatistics.getRowCount(),
rowCountStatistics.getRate(), window);
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index a90972a..7fcc6c2 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -54,6 +54,18 @@ public class BeamIntersectRel extends Intersect implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ // This takes the minimum of the inputs for all the estimate factors.
+ double minimumRows = Double.POSITIVE_INFINITY;
+ double minimumWindowSize = Double.POSITIVE_INFINITY;
+ double minimumRate = Double.POSITIVE_INFINITY;
+
+ for (RelNode input : inputs) {
+ NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(input, mq);
+ minimumRows = Math.min(minimumRows, inputEstimates.getRowCount());
+ minimumRate = Math.min(minimumRate, inputEstimates.getRate());
+ minimumWindowSize = Math.min(minimumWindowSize,
inputEstimates.getWindow());
+ }
+
+ return NodeStats.create(minimumRows, minimumRate,
minimumWindowSize).multiply(0.5);
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 167b8a5..4ec6b76 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -57,8 +57,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
@@ -138,13 +136,25 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
}
@Override
- public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
- return super.computeSelfCost(planner, mq);
- }
-
- @Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ double selectivity = mq.getSelectivity(this, getCondition());
+ NodeStats leftEstimates = BeamSqlRelUtils.getNodeStats(this.left, mq);
+ NodeStats rightEstimates = BeamSqlRelUtils.getNodeStats(this.right, mq);
+
+ if (leftEstimates.isUnknown() || rightEstimates.isUnknown()) {
+ return NodeStats.UNKNOWN;
+ }
+ // If any of the inputs are unbounded row count becomes zero (one of them
would be zero)
+ // If one is bounded and one unbounded the rate will be window of the
bounded (= its row count)
+ // multiplied by the rate of the unbounded one
+ // If both are unbounded, the rate will be multiplication of each rate
into the window of the
+ // other.
+ return NodeStats.create(
+ leftEstimates.getRowCount() * rightEstimates.getRowCount() *
selectivity,
+ (leftEstimates.getRate() * rightEstimates.getWindow()
+ + rightEstimates.getRate() * leftEstimates.getWindow())
+ * selectivity,
+ leftEstimates.getWindow() * rightEstimates.getWindow() * selectivity);
}
/**
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index 0a8103d..416c86b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -54,6 +54,12 @@ public class BeamMinusRel extends Minus implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ NodeStats firstInputEstimates =
BeamSqlRelUtils.getNodeStats(inputs.get(0), mq);
+ // The first input minus half of the others. (We are assuming half of them
have intersection)
+ for (int i = 1; i < inputs.size(); i++) {
+ NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(inputs.get(i),
mq);
+ firstInputEstimates =
firstInputEstimates.minus(inputEstimate.multiply(0.5));
+ }
+ return firstInputEstimates;
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index f252f11..61f7858 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -138,7 +138,8 @@ public class BeamSortRel extends Sort implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ // Sorting does not change rate or row count of the input.
+ return BeamSqlRelUtils.getNodeStats(this.input, mq);
}
public boolean isLimitOnly() {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
index 9f0fd55..fb44f28 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
@@ -22,12 +22,15 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStatsMetadata;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
/** Utilities for {@code BeamRelNode}. */
public class BeamSqlRelUtils {
@@ -76,4 +79,20 @@ public class BeamSqlRelUtils {
}
return (BeamRelNode) input;
}
+
+ public static RelNode getInput(RelNode input) {
+ RelNode result = input;
+ if (input instanceof RelSubset) {
+ // go with known best input
+ result = ((RelSubset) input).getBest();
+ result = result == null ? ((RelSubset) input).getOriginal() : result;
+ }
+
+ return result;
+ }
+
+ public static NodeStats getNodeStats(RelNode input, RelMetadataQuery mq) {
+ input = getInput(input);
+ return input.metadata(NodeStatsMetadata.class, mq).getNodeStats();
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
index 28d2d69..7bca5c4 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
@@ -76,7 +76,10 @@ public class BeamUncollectRel extends Uncollect implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ // We estimate the average length of each array by a constant.
+ // We might be able to get an estimate of the length by making a
MetadataHandler for this
+ // purpose, and get the estimate by reading the first couple of the rows
in the source.
+ return BeamSqlRelUtils.getNodeStats(this.input, mq).multiply(2);
}
private static class UncollectDoFn extends DoFn<Row, Row> {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index 95a1826..b4a6670 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -81,6 +81,14 @@ public class BeamUnionRel extends Union implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ // The summation of the input stats
+ NodeStats summationOfEstimates =
+ inputs.stream()
+ .map(input -> BeamSqlRelUtils.getNodeStats(input, mq))
+ .reduce(NodeStats.create(0, 0, 0), NodeStats::plus);
+ // If all is set then we propagate duplicated values. Otherwise we assume
a constant factor of
+ // them are duplicate.
+ summationOfEstimates = all ? summationOfEstimates :
summationOfEstimates.multiply(0.5);
+ return summationOfEstimates;
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
index b51df06..1687dec 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
@@ -78,7 +78,10 @@ public class BeamUnnestRel extends Uncollect implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ // We estimate the average length of each array by a constant.
+ // We might be able to get an estimate of the length by making a
MetadataHandler for this
+ // purpose, and get the estimate by reading the first couple of the rows
in the source.
+ return BeamSqlRelUtils.getNodeStats(this.input, mq).multiply(2);
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index c4405ba..1ad51b6 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -96,6 +96,6 @@ public class BeamValuesRel extends Values implements
BeamRelNode {
@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
- return NodeStats.create(mq.getRowCount(this));
+ return NodeStats.create(tuples.size(), 0, tuples.size());
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
index 820f4fc..10e0b61 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
@@ -18,10 +18,12 @@
package org.apache.beam.sdk.extensions.sql.impl.planner;
import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.junit.Assert;
@@ -76,4 +78,17 @@ public class NodeStatsTest extends BaseRelTest {
root.metadata(NodeStatsMetadata.class,
root.getCluster().getMetadataQuery()).getNodeStats();
Assert.assertFalse(nodeStats.isUnknown());
}
+
+ @Test
+ public void testSubsetHavingBest() {
+ String sql = " select * from ORDER_DETAILS1 ";
+ RelNode root = env.parseQuery(sql);
+ root = root.getCluster().getPlanner().getRoot();
+
+ // tests if we are actually testing what we want.
+ Assert.assertTrue(root instanceof RelSubset);
+
+ NodeStats estimates = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+ Assert.assertFalse(estimates.isUnknown());
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java
similarity index 60%
copy from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
copy to
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java
index 3da71e0..df9305b 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java
@@ -19,25 +19,21 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.math.BigDecimal;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Rule;
import org.junit.Test;
-/** Test for {@code BeamIOSourceRel}. */
-public class BeamIOSourceRelTest extends BaseRelTest {
- @Rule public final TestPipeline pipeline = TestPipeline.create();
-
- public static final DateTime FIRST_DATE = new DateTime(1);
- public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+/** Tests related to {@code BeamAggregationRel}. */
+public class BeamAggregationRelTest extends BaseRelTest {
+ private static final DateTime FIRST_DATE = new DateTime(1);
+ private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
private static final Duration WINDOW_SIZE = Duration.standardHours(1);
@@ -100,29 +96,57 @@ public class BeamIOSourceRelTest extends BaseRelTest {
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(2d)));
}
- @Test
- public void boundedRowCount() {
- String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
-
+ private NodeStats getEstimateOf(String sql) {
RelNode root = env.parseQuery(sql);
- while (!(root instanceof BeamIOSourceRel)) {
- root = env.parseQuery(sql).getInput(0);
+ while (!(root instanceof BeamAggregationRel)) {
+ root = root.getInput(0);
}
- Assert.assertEquals(5d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+ return BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
}
@Test
- public void unboundedRowCount() {
- String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+ public void testNodeStats() {
+ String sql = "SELECT order_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY
order_id ";
- RelNode root = env.parseQuery(sql);
+ NodeStats estimate = getEstimateOf(sql);
- while (!(root instanceof BeamIOSourceRel)) {
- root = env.parseQuery(sql).getInput(0);
- }
+ Assert.assertEquals(5d / 2, estimate.getRowCount(), 0.001);
+ Assert.assertEquals(5d / 2, estimate.getWindow(), 0.001);
+ Assert.assertEquals(0., estimate.getRate(), 0.001);
+ }
+
+ @Test
+ public void testNodeStatsEffectOfGroupSet() {
+ String sql1 = "SELECT order_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY
order_id ";
+ String sql2 =
+ "SELECT order_id, site_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY
order_id, site_id ";
- Assert.assertEquals(2d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+ NodeStats estimate1 = getEstimateOf(sql1);
+
+ NodeStats estimate2 = getEstimateOf(sql2);
+
+ Assert.assertTrue(estimate1.getRowCount() < estimate2.getRowCount());
+ Assert.assertTrue(estimate1.getWindow() < estimate2.getWindow());
+ }
+
+ @Test
+ public void testNodeStatsUnboundedWindow() {
+ String sql =
+ "select order_id, sum(site_id) as sum_site_id FROM
ORDER_DETAILS_UNBOUNDED "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)";
+ NodeStats estimate1 = getEstimateOf(sql);
+ Assert.assertEquals(1d, estimate1.getRate(), 0.01);
+ Assert.assertEquals(BeamIOSourceRel.CONSTANT_WINDOW_SIZE / 2,
estimate1.getWindow(), 0.01);
+ }
+
+ @Test
+ public void testNodeStatsSlidingWindow() {
+ String sql =
+ "select order_id, sum(site_id) as sum_site_id FROM
ORDER_DETAILS_UNBOUNDED "
+ + " GROUP BY order_id, HOP(order_time, INTERVAL '1'
SECOND,INTERVAL '3' SECOND)";
+ NodeStats estimate1 = getEstimateOf(sql);
+ Assert.assertEquals(3d, estimate1.getRate(), 0.01);
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java
similarity index 55%
copy from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
copy to
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java
index 3da71e0..ad64f0d 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java
@@ -19,25 +19,21 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.math.BigDecimal;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Rule;
import org.junit.Test;
-/** Test for {@code BeamIOSourceRel}. */
-public class BeamIOSourceRelTest extends BaseRelTest {
- @Rule public final TestPipeline pipeline = TestPipeline.create();
-
- public static final DateTime FIRST_DATE = new DateTime(1);
- public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+/** Tests related to {@code BeamCalcRel}. */
+public class BeamCalcRelTest extends BaseRelTest {
+ private static final DateTime FIRST_DATE = new DateTime(1);
+ private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
private static final Duration WINDOW_SIZE = Duration.standardHours(1);
@@ -101,28 +97,67 @@ public class BeamIOSourceRelTest extends BaseRelTest {
}
@Test
- public void boundedRowCount() {
- String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
+ public void testProjectionNodeStats() {
+ String sql = "SELECT order_id FROM ORDER_DETAILS_BOUNDED";
RelNode root = env.parseQuery(sql);
- while (!(root instanceof BeamIOSourceRel)) {
- root = env.parseQuery(sql).getInput(0);
- }
+ Assert.assertTrue(root instanceof BeamCalcRel);
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
- Assert.assertEquals(5d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+ Assert.assertEquals(5d, estimate.getRowCount(), 0.001);
+ Assert.assertEquals(5d, estimate.getWindow(), 0.001);
+ Assert.assertEquals(0., estimate.getRate(), 0.001);
}
@Test
- public void unboundedRowCount() {
- String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+ public void testFilterNodeStats() {
+ String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1";
RelNode root = env.parseQuery(sql);
- while (!(root instanceof BeamIOSourceRel)) {
- root = env.parseQuery(sql).getInput(0);
- }
+ Assert.assertTrue(root instanceof BeamCalcRel);
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertTrue(5d > estimate.getRowCount());
+ Assert.assertTrue(5d > estimate.getWindow());
+ Assert.assertEquals(0., estimate.getRate(), 0.001);
+ }
+
+ @Test
+ public void testNodeStatsConditionType() {
+ String equalSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1";
+ String geqSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id>=1";
+
+ RelNode equalRoot = env.parseQuery(equalSql);
+ RelNode geqRoot = env.parseQuery(geqSql);
+
+ NodeStats equalEstimate =
+ BeamSqlRelUtils.getNodeStats(equalRoot,
equalRoot.getCluster().getMetadataQuery());
+ NodeStats geqEstimate =
+ BeamSqlRelUtils.getNodeStats(geqRoot,
geqRoot.getCluster().getMetadataQuery());
+
+ Assert.assertTrue(geqEstimate.getRowCount() > equalEstimate.getRowCount());
+ Assert.assertTrue(geqEstimate.getWindow() > equalEstimate.getWindow());
+ }
+
+ @Test
+ public void testNodeStatsNumberOfConditions() {
+ String equalSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1";
+ String doubleEqualSql = "SELECT * FROM ORDER_DETAILS_BOUNDED WHERE
order_id=1 AND site_id=2 ";
+
+ RelNode equalRoot = env.parseQuery(equalSql);
+ RelNode doubleEqualRoot = env.parseQuery(doubleEqualSql);
+
+ NodeStats equalEstimate =
+ BeamSqlRelUtils.getNodeStats(equalRoot,
equalRoot.getCluster().getMetadataQuery());
+ NodeStats doubleEqualEstimate =
+ BeamSqlRelUtils.getNodeStats(
+ doubleEqualRoot, doubleEqualRoot.getCluster().getMetadataQuery());
- Assert.assertEquals(2d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+ Assert.assertTrue(doubleEqualEstimate.getRowCount() <
equalEstimate.getRowCount());
+ Assert.assertTrue(doubleEqualEstimate.getWindow() <
equalEstimate.getWindow());
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
index 3da71e0..22fb229 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.math.BigDecimal;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.schemas.Schema;
@@ -36,8 +37,8 @@ import org.junit.Test;
public class BeamIOSourceRelTest extends BaseRelTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
- public static final DateTime FIRST_DATE = new DateTime(1);
- public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+ private static final DateTime FIRST_DATE = new DateTime(1);
+ private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
private static final Duration WINDOW_SIZE = Duration.standardHours(1);
@@ -107,7 +108,7 @@ public class BeamIOSourceRelTest extends BaseRelTest {
RelNode root = env.parseQuery(sql);
while (!(root instanceof BeamIOSourceRel)) {
- root = env.parseQuery(sql).getInput(0);
+ root = root.getInput(0);
}
Assert.assertEquals(5d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
@@ -120,9 +121,43 @@ public class BeamIOSourceRelTest extends BaseRelTest {
RelNode root = env.parseQuery(sql);
while (!(root instanceof BeamIOSourceRel)) {
- root = env.parseQuery(sql).getInput(0);
+ root = root.getInput(0);
}
Assert.assertEquals(2d,
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
}
+
+ @Test
+ public void testBoundedNodeStats() {
+ String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamIOSourceRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertEquals(5d, estimate.getRowCount(), 0.01);
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+ Assert.assertEquals(5d, estimate.getWindow(), 0.01);
+ }
+
+ @Test
+ public void testUnboundedNodeStats() {
+ String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamIOSourceRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+ Assert.assertEquals(2d, estimate.getRate(), 0.01);
+ Assert.assertEquals(BeamIOSourceRel.CONSTANT_WINDOW_SIZE,
estimate.getWindow(), 0.01);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index 1bcbed4..2b58272 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -19,12 +19,15 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.math.BigDecimal;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -121,4 +124,28 @@ public class BeamIntersectRelTest extends BaseRelTest {
pipeline.run();
}
+
+ @Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT order_id, site_id, price "
+ + " FROM ORDER_DETAILS1 "
+ + " INTERSECT "
+ + " SELECT order_id, site_id, price "
+ + " FROM ORDER_DETAILS2 ";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamIntersectRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+ Assert.assertEquals(3. / 2., estimate.getRowCount(), 0.01);
+ Assert.assertEquals(3. / 2., estimate.getWindow(), 0.01);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index d8e8a61..f208ecc 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -18,13 +18,16 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
import org.hamcrest.core.StringContains;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -82,6 +85,78 @@ public class BeamJoinRelBoundedVsBoundedTest extends
BaseRelTest {
}
@Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT * "
+ + " FROM ORDER_DETAILS1 o1 "
+ + " JOIN ORDER_DETAILS2 o2 "
+ + " on "
+ + " o1.order_id=o2.site_id ";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamJoinRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+ NodeStats leftEstimate =
+ BeamSqlRelUtils.getNodeStats(
+ ((BeamJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
+ NodeStats rightEstimate =
+ BeamSqlRelUtils.getNodeStats(
+ ((BeamJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+ Assert.assertNotEquals(0d, estimate.getRowCount(), 0.001);
+ Assert.assertTrue(
+ estimate.getRowCount() < leftEstimate.getRowCount() *
rightEstimate.getRowCount());
+
+ Assert.assertNotEquals(0d, estimate.getWindow(), 0.001);
+ Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() *
rightEstimate.getWindow());
+ }
+
+ @Test
+ public void testNodeStatsOfMoreConditions() {
+ String sql1 =
+ "SELECT * "
+ + " FROM ORDER_DETAILS1 o1 "
+ + " JOIN ORDER_DETAILS2 o2 "
+ + " on "
+ + " o1.order_id=o2.site_id ";
+
+ String sql2 =
+ "SELECT * "
+ + " FROM ORDER_DETAILS1 o1 "
+ + " JOIN ORDER_DETAILS2 o2 "
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+ RelNode root1 = env.parseQuery(sql1);
+
+ while (!(root1 instanceof BeamJoinRel)) {
+ root1 = root1.getInput(0);
+ }
+
+ RelNode root2 = env.parseQuery(sql2);
+
+ while (!(root2 instanceof BeamJoinRel)) {
+ root2 = root2.getInput(0);
+ }
+
+ NodeStats estimate1 =
+ BeamSqlRelUtils.getNodeStats(root1,
root1.getCluster().getMetadataQuery());
+ NodeStats estimate2 =
+ BeamSqlRelUtils.getNodeStats(root2,
root1.getCluster().getMetadataQuery());
+
+ Assert.assertNotEquals(0d, estimate2.getRowCount(), 0.001);
+ // A join with two conditions should have lower estimate.
+ Assert.assertTrue(estimate2.getRowCount() < estimate1.getRowCount());
+ }
+
+ @Test
public void testLeftOuterJoin() throws Exception {
String sql =
"SELECT * "
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 190b091..e1a5d8b 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
@@ -34,8 +35,10 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
import org.joda.time.DateTime;
import org.joda.time.Duration;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -178,6 +181,44 @@ public class BeamJoinRelUnboundedVsBoundedTest extends
BaseRelTest {
}
@Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + "(select order_id, sum(site_id) as sum_site_id FROM
ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1'
HOUR)) o1 "
+ + " JOIN "
+ + " ORDER_DETAILS1 o2 "
+ + " on "
+ + " o1.order_id=o2.order_id";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamJoinRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+ NodeStats leftEstimate =
+ BeamSqlRelUtils.getNodeStats(
+ ((BeamJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
+ NodeStats rightEstimate =
+ BeamSqlRelUtils.getNodeStats(
+ ((BeamJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+
+ Assert.assertNotEquals(0d, estimate.getRate(), 0.001);
+ Assert.assertTrue(
+ estimate.getRate()
+ < leftEstimate.getRowCount() * rightEstimate.getWindow()
+ + rightEstimate.getRowCount() * leftEstimate.getWindow());
+
+ Assert.assertNotEquals(0d, estimate.getWindow(), 0.001);
+ Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() *
rightEstimate.getWindow());
+ }
+
+ @Test
public void testLeftOuterJoin() throws Exception {
String sql =
"SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 869b1b8..1f19cf8 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.schemas.Schema;
@@ -26,8 +28,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
import org.joda.time.DateTime;
import org.joda.time.Duration;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -35,8 +39,8 @@ import org.junit.Test;
/** Unbounded + Unbounded Test for {@code BeamJoinRel}. */
public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
- public static final DateTime FIRST_DATE = new DateTime(1);
- public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+ private static final DateTime FIRST_DATE = new DateTime(1);
+ private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
private static final Duration WINDOW_SIZE = Duration.standardHours(1);
@@ -72,7 +76,8 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends
BaseRelTest {
2,
3,
3,
- SECOND_DATE));
+ SECOND_DATE)
+
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(3d)));
}
@Test
@@ -103,6 +108,45 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends
BaseRelTest {
}
@Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT * FROM "
+ + "(select order_id, sum(site_id) as sum_site_id FROM
ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1'
HOUR)) o1 "
+ + " JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM
ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1'
HOUR)) o2 "
+ + " on "
+ + " o1.order_id=o2.order_id";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamJoinRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+ NodeStats leftEstimate =
+ BeamSqlRelUtils.getNodeStats(
+ ((BeamJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
+ NodeStats rightEstimate =
+ BeamSqlRelUtils.getNodeStats(
+ ((BeamJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+
+ Assert.assertNotEquals(0d, estimate.getRate(), 0.001);
+ Assert.assertTrue(
+ estimate.getRate()
+ < leftEstimate.getRate() * rightEstimate.getWindow()
+ + rightEstimate.getRate() * leftEstimate.getWindow());
+
+ Assert.assertNotEquals(0d, estimate.getWindow(), 0.001);
+ Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() *
rightEstimate.getWindow());
+ }
+
+ @Test
public void testLeftOuterJoin() throws Exception {
String sql =
"SELECT * FROM "
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 6484360..c29eeb2 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -19,12 +19,19 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.math.BigDecimal;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -33,6 +40,11 @@ import org.junit.Test;
public class BeamMinusRelTest extends BaseRelTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
+ private static final DateTime FIRST_DATE = new DateTime(1);
+ private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+
+ private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
@BeforeClass
public static void prepare() {
registerTable(
@@ -74,10 +86,43 @@ public class BeamMinusRelTest extends BaseRelTest {
3L,
3,
new BigDecimal(3.0)));
+
+ registerTable(
+ "ORDER_DETAILS_UNBOUNDED",
+ TestUnboundedTable.of(
+ Schema.FieldType.INT32, "order_id",
+ Schema.FieldType.INT32, "site_id",
+ Schema.FieldType.INT32, "price",
+ Schema.FieldType.DATETIME, "order_time")
+ .timestampColumnIndex(3)
+ .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)
+ .addRows(
+ WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+ 2,
+ 2,
+ 7,
+ SECOND_DATE,
+ 2,
+ 3,
+ 8,
+ SECOND_DATE,
+ // this late record is omitted(First window)
+ 1,
+ 3,
+ 3,
+ FIRST_DATE)
+ .addRows(
+ // this late record is omitted(Second window)
+
WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+ 2,
+ 3,
+ 3,
+ SECOND_DATE)
+
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(4d)));
}
@Test
- public void testExcept() throws Exception {
+ public void testExcept() {
String sql = "";
sql +=
"SELECT order_id, site_id, price "
@@ -100,7 +145,7 @@ public class BeamMinusRelTest extends BaseRelTest {
}
@Test
- public void testExceptAll() throws Exception {
+ public void testExceptAll() {
String sql = "";
sql +=
"SELECT order_id, site_id, price "
@@ -134,7 +179,7 @@ public class BeamMinusRelTest extends BaseRelTest {
}
@Test
- public void testExceptRemovesDuplicates() throws Exception {
+ public void testExceptRemovesDuplicates() {
String sql = "(SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 1) EXCEPT
SELECT 1";
PCollection<Row> rows = compilePipeline(sql, pipeline);
@@ -146,4 +191,52 @@ public class BeamMinusRelTest extends BaseRelTest {
pipeline.run();
}
+
+ @Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS1 "
+ + " EXCEPT ALL "
+ + "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS2 ";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamMinusRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+ Assert.assertEquals(5. - 3. / 2., estimate.getRowCount(), 0.01);
+ Assert.assertEquals(5. - 3. / 2., estimate.getWindow(), 0.01);
+ }
+
+ @Test
+ public void testNodeStatsEstimationUnbounded() {
+ String sql =
+ "SELECT * "
+ + "FROM "
+ + "(select order_id FROM ORDER_DETAILS_UNBOUNDED "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1'
HOUR)) o1 "
+ + " EXCEPT ALL "
+ + " select order_id FROM ORDER_DETAILS_UNBOUNDED "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1'
HOUR) ";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamMinusRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ // note that we have group by
+ Assert.assertEquals(4d / 2 - 4d / 4, estimate.getRate(), 0.01);
+ Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index 3e058a2..15cf8cb 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -18,13 +18,16 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
import org.joda.time.DateTime;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -102,7 +105,7 @@ public class BeamSortRelTest extends BaseRelTest {
}
@Test
- public void testOrderBy_basic() throws Exception {
+ public void testOrderBy_basic() {
String sql =
"INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ " order_id, site_id, price "
@@ -122,7 +125,7 @@ public class BeamSortRelTest extends BaseRelTest {
}
@Test
- public void testOrderBy_timestamp() throws Exception {
+ public void testOrderBy_timestamp() {
String sql =
"SELECT order_id, site_id, price, order_time "
+ "FROM ORDER_DETAILS "
@@ -158,7 +161,7 @@ public class BeamSortRelTest extends BaseRelTest {
}
@Test
- public void testOrderBy_nullsFirst() throws Exception {
+ public void testOrderBy_nullsFirst() {
Schema schema =
Schema.builder()
.addField("order_id", Schema.FieldType.INT64)
@@ -188,7 +191,7 @@ public class BeamSortRelTest extends BaseRelTest {
}
@Test
- public void testOrderBy_nullsLast() throws Exception {
+ public void testOrderBy_nullsLast() {
Schema schema =
Schema.builder()
.addField("order_id", Schema.FieldType.INT64)
@@ -218,7 +221,7 @@ public class BeamSortRelTest extends BaseRelTest {
}
@Test
- public void testOrderBy_with_offset2() throws Exception {
+ public void testOrderBy_with_offset2() {
Schema schema = Schema.builder().addField("count_star",
Schema.FieldType.INT64).build();
String sql =
@@ -232,7 +235,7 @@ public class BeamSortRelTest extends BaseRelTest {
}
@Test
- public void testOrderBy_with_offset() throws Exception {
+ public void testOrderBy_with_offset() {
String sql =
"INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ " order_id, site_id, price "
@@ -252,7 +255,7 @@ public class BeamSortRelTest extends BaseRelTest {
}
@Test
- public void testOrderBy_bigFetch() throws Exception {
+ public void testOrderBy_bigFetch() {
String sql =
"INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ " order_id, site_id, price "
@@ -288,4 +291,26 @@ public class BeamSortRelTest extends BaseRelTest {
TestPipeline pipeline = TestPipeline.create();
compilePipeline(sql, pipeline);
}
+
+ @Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT order_id, site_id, price, order_time "
+ + "FROM ORDER_DETAILS "
+ + "ORDER BY order_time asc limit 11";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamSortRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+ Assert.assertEquals(10., estimate.getRowCount(), 0.01);
+ Assert.assertEquals(10., estimate.getWindow(), 0.01);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java
new file mode 100644
index 0000000..2992c72
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests for {@code BeamUncollectRel}. */
+public class BeamUncollectRelTest extends BaseRelTest {
+ private NodeStats getEstimateOf(String sql) {
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamUncollectRel)) {
+ root = root.getInput(0);
+ }
+
+ return BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+ }
+
+ @Test
+ public void testNodeStats() {
+ NodeStats estimate =
+ getEstimateOf(
+ "SELECT * FROM UNNEST (SELECT * FROM (VALUES (ARRAY ['a', 'b',
'c']),(ARRAY ['a', 'b', 'c']))) t1");
+
+ Assert.assertEquals(4d, estimate.getRowCount(), 0.001);
+ Assert.assertEquals(4d, estimate.getWindow(), 0.001);
+ Assert.assertEquals(0., estimate.getRate(), 0.001);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index b7ee1a8..3ed476d 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -19,12 +19,15 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.math.BigDecimal;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -99,4 +102,54 @@ public class BeamUnionRelTest extends BaseRelTest {
.getRows());
pipeline.run();
}
+
+ @Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT "
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS "
+ + " UNION SELECT "
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS ";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamUnionRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+ Assert.assertEquals(2., estimate.getRowCount(), 0.01);
+ Assert.assertEquals(2., estimate.getWindow(), 0.01);
+ }
+
+ @Test
+ public void testNodeStatsEstimationUnionAll() {
+ String sql =
+ "SELECT "
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS "
+ + " UNION ALL SELECT "
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS ";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamUnionRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+ Assert.assertEquals(4., estimate.getRowCount(), 0.01);
+ Assert.assertEquals(4., estimate.getWindow(), 0.01);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index 0dc8e26..065b558 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -18,12 +18,15 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -104,4 +107,25 @@ public class BeamValuesRelTest extends BaseRelTest {
.getRows());
pipeline.run();
}
+
+ @Test
+ public void testNodeStatsEstimation() {
+ String sql =
+ "SELECT * FROM (VALUES
('value1'),('value2'),('value3'),('value4'),('value5'),"
+ + " ('value6'),('value7'),('value8'),('value9'))";
+
+ RelNode root = env.parseQuery(sql);
+
+ while (!(root instanceof BeamValuesRel)) {
+ root = root.getInput(0);
+ }
+
+ NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
+
+ Assert.assertFalse(estimate.isUnknown());
+ Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+ Assert.assertEquals(9., estimate.getRowCount(), 0.01);
+ Assert.assertEquals(9., estimate.getWindow(), 0.01);
+ }
}