This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3ca9149 [BEAM-7844] Implementing NodeStat Estimations for all the nodes new 7d56a23 Merge pull request #9198 from riazela/RowRateWindowEstimation 3ca9149 is described below commit 3ca91490e0644ab89e5febc9d139402877f939e1 Author: Alireza Samadian <alireza4...@gmail.com> AuthorDate: Thu Aug 1 09:02:11 2019 -0700 [BEAM-7844] Implementing NodeStat Estimations for all the nodes --- .../sql/impl/planner/NodeStatsMetadata.java | 4 +- .../sql/impl/rel/BeamAggregationRel.java | 39 ++++++++- .../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 22 ++++- .../sdk/extensions/sql/impl/rel/BeamIOSinkRel.java | 2 +- .../extensions/sql/impl/rel/BeamIOSourceRel.java | 9 +- .../extensions/sql/impl/rel/BeamIntersectRel.java | 14 ++- .../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 26 ++++-- .../sdk/extensions/sql/impl/rel/BeamMinusRel.java | 8 +- .../sdk/extensions/sql/impl/rel/BeamSortRel.java | 3 +- .../extensions/sql/impl/rel/BeamSqlRelUtils.java | 19 +++++ .../extensions/sql/impl/rel/BeamUncollectRel.java | 5 +- .../sdk/extensions/sql/impl/rel/BeamUnionRel.java | 10 ++- .../sdk/extensions/sql/impl/rel/BeamUnnestRel.java | 5 +- .../sdk/extensions/sql/impl/rel/BeamValuesRel.java | 2 +- .../extensions/sql/impl/planner/NodeStatsTest.java | 15 ++++ ...rceRelTest.java => BeamAggregationRelTest.java} | 70 ++++++++++----- ...amIOSourceRelTest.java => BeamCalcRelTest.java} | 77 ++++++++++++----- .../sql/impl/rel/BeamIOSourceRelTest.java | 43 +++++++++- .../sql/impl/rel/BeamIntersectRelTest.java | 27 ++++++ .../impl/rel/BeamJoinRelBoundedVsBoundedTest.java | 75 ++++++++++++++++ .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 41 +++++++++ .../rel/BeamJoinRelUnboundedVsUnboundedTest.java | 50 ++++++++++- .../extensions/sql/impl/rel/BeamMinusRelTest.java | 99 +++++++++++++++++++++- .../extensions/sql/impl/rel/BeamSortRelTest.java | 39 +++++++-- .../sql/impl/rel/BeamUncollectRelTest.java | 47 ++++++++++ .../extensions/sql/impl/rel/BeamUnionRelTest.java | 53 ++++++++++++ .../extensions/sql/impl/rel/BeamValuesRelTest.java | 24 ++++++ 27 files changed, 745 insertions(+), 83 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java index 4a9e79f..8bc62ee 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java @@ -35,8 +35,8 @@ public interface NodeStatsMetadata extends Metadata { MetadataDef<NodeStatsMetadata> DEF = MetadataDef.of(NodeStatsMetadata.class, NodeStatsMetadata.Handler.class, METHOD); - // In order to use this we need to call it by relNode.metadata(RowRateWindowMetadata.class, - // mq).getRowRateWindow() where mq is the MetadataQuery (can be obtained by + // In order to use this we need to call it by relNode.metadata(NodeStatsMetadata.class, + // mq).getNodeStats() where mq is the MetadataQuery (can be obtained by // relNode.getCluster().getMetadataQuery()). After this, Calcite looks for the implementation of // this metadata that we have registered in MetadataProvider (it is RelMdNodeStats.class in // this case and we have registered it in CalciteQueryPlanner). Then Calcite's generated Code diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index 14e7475..4e5978e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -84,7 +84,44 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + + NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(this.input, mq); + + inputEstimate = computeWindowingCostEffect(inputEstimate); + + NodeStats estimate; + // groupCount shows how many columns do we have in group by. One of them might be the windowing. + int groupCount = groupSet.cardinality() - (windowFn == null ? 0 : 1); + // This is similar to what Calcite does.If groupCount is zero then then we have only one value + // per window for unbounded and we have only one value for bounded. e.g select count(*) from A + // If group count is none zero then more column we include in the group by, more rows will be + // preserved. + return (groupCount == 0) + ? NodeStats.create( + Math.min(inputEstimate.getRowCount(), 1d), + inputEstimate.getRate() / inputEstimate.getWindow(), + 1d) + : inputEstimate.multiply(1.0 - Math.pow(.5, groupCount)); + } + + private NodeStats computeWindowingCostEffect(NodeStats inputStat) { + if (windowFn == null) { + return inputStat; + } + WindowFn w = windowFn; + double multiplicationFactor = 1; + // If the window is SlidingWindow, the number of tuples will increase. (Because, some of the + // tuples repeat in multiple windows). + if (w instanceof SlidingWindows) { + multiplicationFactor = + ((double) ((SlidingWindows) w).getSize().getStandardSeconds()) + / ((SlidingWindows) w).getPeriod().getStandardSeconds(); + } + + return NodeStats.create( + inputStat.getRowCount() * multiplicationFactor, + inputStat.getRate() * multiplicationFactor, + BeamIOSourceRel.CONSTANT_WINDOW_SIZE); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 31ae94a..755aaec 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -73,6 +73,8 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Calc; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexSimplify; import org.apache.calcite.rex.RexUtil; @@ -216,7 +218,25 @@ public class BeamCalcRel extends Calc implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq); + double selectivity = estimateFilterSelectivity(getInput(), program, mq); + + return inputStat.multiply(selectivity); + } + + private static double estimateFilterSelectivity( + RelNode child, RexProgram program, RelMetadataQuery mq) { + // Similar to calcite, if the calc node is representing filter operation we estimate the filter + // selectivity based on the number of equality conditions, number of inequality conditions, .... + RexLocalRef programCondition = program.getCondition(); + RexNode condition; + if (programCondition == null) { + condition = null; + } else { + condition = program.expandLocalRef(programCondition); + } + // Currently this gets the selectivity based on Calcite's Selectivity Handler (RelMdSelectivity) + return mq.getSelectivity(child, condition); } public boolean isInputSortRelAndLimitOnly() { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java index 5aaa635..5738e44 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java @@ -74,7 +74,7 @@ public class BeamIOSinkRel extends TableModify @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + return BeamSqlRelUtils.getNodeStats(this.input, mq); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java index d87f152..6305924 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java @@ -37,7 +37,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; /** BeamRelNode to replace a {@code TableScan} node. */ public class BeamIOSourceRel extends TableScan implements BeamRelNode { - + public static final double CONSTANT_WINDOW_SIZE = 10d; private final BeamSqlTable beamTable; private final BeamCalciteTable calciteTable; private final Map<String, String> pipelineOptions; @@ -66,7 +66,12 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + BeamTableStatistics rowCountStatistics = calciteTable.getStatistic(); + double window = + (beamTable.isBounded() == PCollection.IsBounded.BOUNDED) + ? rowCountStatistics.getRowCount() + : CONSTANT_WINDOW_SIZE; + return NodeStats.create(rowCountStatistics.getRowCount(), rowCountStatistics.getRate(), window); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java index a90972a..7fcc6c2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java @@ -54,6 +54,18 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + // This takes the minimum of the inputs for all the estimate factors. + double minimumRows = Double.POSITIVE_INFINITY; + double minimumWindowSize = Double.POSITIVE_INFINITY; + double minimumRate = Double.POSITIVE_INFINITY; + + for (RelNode input : inputs) { + NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(input, mq); + minimumRows = Math.min(minimumRows, inputEstimates.getRowCount()); + minimumRate = Math.min(minimumRate, inputEstimates.getRate()); + minimumWindowSize = Math.min(minimumWindowSize, inputEstimates.getWindow()); + } + + return NodeStats.create(minimumRows, minimumRate, minimumWindowSize).multiply(0.5); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 167b8a5..4ec6b76 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -57,8 +57,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptCost; -import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.CorrelationId; @@ -138,13 +136,25 @@ public class BeamJoinRel extends Join implements BeamRelNode { } @Override - public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { - return super.computeSelfCost(planner, mq); - } - - @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + double selectivity = mq.getSelectivity(this, getCondition()); + NodeStats leftEstimates = BeamSqlRelUtils.getNodeStats(this.left, mq); + NodeStats rightEstimates = BeamSqlRelUtils.getNodeStats(this.right, mq); + + if (leftEstimates.isUnknown() || rightEstimates.isUnknown()) { + return NodeStats.UNKNOWN; + } + // If any of the inputs are unbounded row count becomes zero (one of them would be zero) + // If one is bounded and one unbounded the rate will be window of the bounded (= its row count) + // multiplied by the rate of the unbounded one + // If both are unbounded, the rate will be multiplication of each rate into the window of the + // other. + return NodeStats.create( + leftEstimates.getRowCount() * rightEstimates.getRowCount() * selectivity, + (leftEstimates.getRate() * rightEstimates.getWindow() + + rightEstimates.getRate() * leftEstimates.getWindow()) + * selectivity, + leftEstimates.getWindow() * rightEstimates.getWindow() * selectivity); } /** diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java index 0a8103d..416c86b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java @@ -54,6 +54,12 @@ public class BeamMinusRel extends Minus implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + NodeStats firstInputEstimates = BeamSqlRelUtils.getNodeStats(inputs.get(0), mq); + // The first input minus half of the others. (We are assuming half of them have intersection) + for (int i = 1; i < inputs.size(); i++) { + NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(inputs.get(i), mq); + firstInputEstimates = firstInputEstimates.minus(inputEstimate.multiply(0.5)); + } + return firstInputEstimates; } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index f252f11..61f7858 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -138,7 +138,8 @@ public class BeamSortRel extends Sort implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + // Sorting does not change rate or row count of the input. + return BeamSqlRelUtils.getNodeStats(this.input, mq); } public boolean isLimitOnly() { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java index 9f0fd55..fb44f28 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java @@ -22,12 +22,15 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStatsMetadata; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; /** Utilities for {@code BeamRelNode}. */ public class BeamSqlRelUtils { @@ -76,4 +79,20 @@ public class BeamSqlRelUtils { } return (BeamRelNode) input; } + + public static RelNode getInput(RelNode input) { + RelNode result = input; + if (input instanceof RelSubset) { + // go with known best input + result = ((RelSubset) input).getBest(); + result = result == null ? ((RelSubset) input).getOriginal() : result; + } + + return result; + } + + public static NodeStats getNodeStats(RelNode input, RelMetadataQuery mq) { + input = getInput(input); + return input.metadata(NodeStatsMetadata.class, mq).getNodeStats(); + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java index 28d2d69..7bca5c4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java @@ -76,7 +76,10 @@ public class BeamUncollectRel extends Uncollect implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + // We estimate the average length of each array by a constant. + // We might be able to get an estimate of the length by making a MetadataHandler for this + // purpose, and get the estimate by reading the first couple of the rows in the source. + return BeamSqlRelUtils.getNodeStats(this.input, mq).multiply(2); } private static class UncollectDoFn extends DoFn<Row, Row> { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java index 95a1826..b4a6670 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java @@ -81,6 +81,14 @@ public class BeamUnionRel extends Union implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + // The summation of the input stats + NodeStats summationOfEstimates = + inputs.stream() + .map(input -> BeamSqlRelUtils.getNodeStats(input, mq)) + .reduce(NodeStats.create(0, 0, 0), NodeStats::plus); + // If all is set then we propagate duplicated values. Otherwise we assume a constant factor of + // them are duplicate. + summationOfEstimates = all ? summationOfEstimates : summationOfEstimates.multiply(0.5); + return summationOfEstimates; } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java index b51df06..1687dec 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java @@ -78,7 +78,10 @@ public class BeamUnnestRel extends Uncollect implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + // We estimate the average length of each array by a constant. + // We might be able to get an estimate of the length by making a MetadataHandler for this + // purpose, and get the estimate by reading the first couple of the rows in the source. + return BeamSqlRelUtils.getNodeStats(this.input, mq).multiply(2); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index c4405ba..1ad51b6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -96,6 +96,6 @@ public class BeamValuesRel extends Values implements BeamRelNode { @Override public NodeStats estimateNodeStats(RelMetadataQuery mq) { - return NodeStats.create(mq.getRowCount(this)); + return NodeStats.create(tuples.size(), 0, tuples.size()); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java index 820f4fc..10e0b61 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java @@ -18,10 +18,12 @@ package org.apache.beam.sdk.extensions.sql.impl.planner; import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; import org.junit.Assert; @@ -76,4 +78,17 @@ public class NodeStatsTest extends BaseRelTest { root.metadata(NodeStatsMetadata.class, root.getCluster().getMetadataQuery()).getNodeStats(); Assert.assertFalse(nodeStats.isUnknown()); } + + @Test + public void testSubsetHavingBest() { + String sql = " select * from ORDER_DETAILS1 "; + RelNode root = env.parseQuery(sql); + root = root.getCluster().getPlanner().getRoot(); + + // tests if we are actually testing what we want. + Assert.assertTrue(root instanceof RelSubset); + + NodeStats estimates = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + Assert.assertFalse(estimates.isUnknown()); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java similarity index 60% copy from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java copy to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java index 3da71e0..df9305b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java @@ -19,25 +19,21 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.math.BigDecimal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.joda.time.DateTime; import org.joda.time.Duration; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -/** Test for {@code BeamIOSourceRel}. */ -public class BeamIOSourceRelTest extends BaseRelTest { - @Rule public final TestPipeline pipeline = TestPipeline.create(); - - public static final DateTime FIRST_DATE = new DateTime(1); - public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); +/** Tests related to {@code BeamAggregationRel}. */ +public class BeamAggregationRelTest extends BaseRelTest { + private static final DateTime FIRST_DATE = new DateTime(1); + private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); private static final Duration WINDOW_SIZE = Duration.standardHours(1); @@ -100,29 +96,57 @@ public class BeamIOSourceRelTest extends BaseRelTest { .setStatistics(BeamTableStatistics.createUnboundedTableStatistics(2d))); } - @Test - public void boundedRowCount() { - String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED"; - + private NodeStats getEstimateOf(String sql) { RelNode root = env.parseQuery(sql); - while (!(root instanceof BeamIOSourceRel)) { - root = env.parseQuery(sql).getInput(0); + while (!(root instanceof BeamAggregationRel)) { + root = root.getInput(0); } - Assert.assertEquals(5d, root.estimateRowCount(RelMetadataQuery.instance()), 0.001); + return BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); } @Test - public void unboundedRowCount() { - String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED"; + public void testNodeStats() { + String sql = "SELECT order_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY order_id "; - RelNode root = env.parseQuery(sql); + NodeStats estimate = getEstimateOf(sql); - while (!(root instanceof BeamIOSourceRel)) { - root = env.parseQuery(sql).getInput(0); - } + Assert.assertEquals(5d / 2, estimate.getRowCount(), 0.001); + Assert.assertEquals(5d / 2, estimate.getWindow(), 0.001); + Assert.assertEquals(0., estimate.getRate(), 0.001); + } + + @Test + public void testNodeStatsEffectOfGroupSet() { + String sql1 = "SELECT order_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY order_id "; + String sql2 = + "SELECT order_id, site_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY order_id, site_id "; - Assert.assertEquals(2d, root.estimateRowCount(RelMetadataQuery.instance()), 0.001); + NodeStats estimate1 = getEstimateOf(sql1); + + NodeStats estimate2 = getEstimateOf(sql2); + + Assert.assertTrue(estimate1.getRowCount() < estimate2.getRowCount()); + Assert.assertTrue(estimate1.getWindow() < estimate2.getWindow()); + } + + @Test + public void testNodeStatsUnboundedWindow() { + String sql = + "select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS_UNBOUNDED " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)"; + NodeStats estimate1 = getEstimateOf(sql); + Assert.assertEquals(1d, estimate1.getRate(), 0.01); + Assert.assertEquals(BeamIOSourceRel.CONSTANT_WINDOW_SIZE / 2, estimate1.getWindow(), 0.01); + } + + @Test + public void testNodeStatsSlidingWindow() { + String sql = + "select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS_UNBOUNDED " + + " GROUP BY order_id, HOP(order_time, INTERVAL '1' SECOND,INTERVAL '3' SECOND)"; + NodeStats estimate1 = getEstimateOf(sql); + Assert.assertEquals(3d, estimate1.getRate(), 0.01); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java similarity index 55% copy from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java copy to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java index 3da71e0..ad64f0d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java @@ -19,25 +19,21 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.math.BigDecimal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.joda.time.DateTime; import org.joda.time.Duration; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -/** Test for {@code BeamIOSourceRel}. */ -public class BeamIOSourceRelTest extends BaseRelTest { - @Rule public final TestPipeline pipeline = TestPipeline.create(); - - public static final DateTime FIRST_DATE = new DateTime(1); - public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); +/** Tests related to {@code BeamCalcRel}. */ +public class BeamCalcRelTest extends BaseRelTest { + private static final DateTime FIRST_DATE = new DateTime(1); + private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); private static final Duration WINDOW_SIZE = Duration.standardHours(1); @@ -101,28 +97,67 @@ public class BeamIOSourceRelTest extends BaseRelTest { } @Test - public void boundedRowCount() { - String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED"; + public void testProjectionNodeStats() { + String sql = "SELECT order_id FROM ORDER_DETAILS_BOUNDED"; RelNode root = env.parseQuery(sql); - while (!(root instanceof BeamIOSourceRel)) { - root = env.parseQuery(sql).getInput(0); - } + Assert.assertTrue(root instanceof BeamCalcRel); + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); - Assert.assertEquals(5d, root.estimateRowCount(RelMetadataQuery.instance()), 0.001); + Assert.assertEquals(5d, estimate.getRowCount(), 0.001); + Assert.assertEquals(5d, estimate.getWindow(), 0.001); + Assert.assertEquals(0., estimate.getRate(), 0.001); } @Test - public void unboundedRowCount() { - String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED"; + public void testFilterNodeStats() { + String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1"; RelNode root = env.parseQuery(sql); - while (!(root instanceof BeamIOSourceRel)) { - root = env.parseQuery(sql).getInput(0); - } + Assert.assertTrue(root instanceof BeamCalcRel); + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertTrue(5d > estimate.getRowCount()); + Assert.assertTrue(5d > estimate.getWindow()); + Assert.assertEquals(0., estimate.getRate(), 0.001); + } + + @Test + public void testNodeStatsConditionType() { + String equalSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1"; + String geqSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id>=1"; + + RelNode equalRoot = env.parseQuery(equalSql); + RelNode geqRoot = env.parseQuery(geqSql); + + NodeStats equalEstimate = + BeamSqlRelUtils.getNodeStats(equalRoot, equalRoot.getCluster().getMetadataQuery()); + NodeStats geqEstimate = + BeamSqlRelUtils.getNodeStats(geqRoot, geqRoot.getCluster().getMetadataQuery()); + + Assert.assertTrue(geqEstimate.getRowCount() > equalEstimate.getRowCount()); + Assert.assertTrue(geqEstimate.getWindow() > equalEstimate.getWindow()); + } + + @Test + public void testNodeStatsNumberOfConditions() { + String equalSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1"; + String doubleEqualSql = "SELECT * FROM ORDER_DETAILS_BOUNDED WHERE order_id=1 AND site_id=2 "; + + RelNode equalRoot = env.parseQuery(equalSql); + RelNode doubleEqualRoot = env.parseQuery(doubleEqualSql); + + NodeStats equalEstimate = + BeamSqlRelUtils.getNodeStats(equalRoot, equalRoot.getCluster().getMetadataQuery()); + NodeStats doubleEqualEstimate = + BeamSqlRelUtils.getNodeStats( + doubleEqualRoot, doubleEqualRoot.getCluster().getMetadataQuery()); - Assert.assertEquals(2d, root.estimateRowCount(RelMetadataQuery.instance()), 0.001); + Assert.assertTrue(doubleEqualEstimate.getRowCount() < equalEstimate.getRowCount()); + Assert.assertTrue(doubleEqualEstimate.getWindow() < equalEstimate.getWindow()); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java index 3da71e0..22fb229 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.math.BigDecimal; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; import org.apache.beam.sdk.schemas.Schema; @@ -36,8 +37,8 @@ import org.junit.Test; public class BeamIOSourceRelTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - public static final DateTime FIRST_DATE = new DateTime(1); - public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); + private static final DateTime FIRST_DATE = new DateTime(1); + private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); private static final Duration WINDOW_SIZE = Duration.standardHours(1); @@ -107,7 +108,7 @@ public class BeamIOSourceRelTest extends BaseRelTest { RelNode root = env.parseQuery(sql); while (!(root instanceof BeamIOSourceRel)) { - root = env.parseQuery(sql).getInput(0); + root = root.getInput(0); } Assert.assertEquals(5d, root.estimateRowCount(RelMetadataQuery.instance()), 0.001); @@ -120,9 +121,43 @@ public class BeamIOSourceRelTest extends BaseRelTest { RelNode root = env.parseQuery(sql); while (!(root instanceof BeamIOSourceRel)) { - root = env.parseQuery(sql).getInput(0); + root = root.getInput(0); } Assert.assertEquals(2d, root.estimateRowCount(RelMetadataQuery.instance()), 0.001); } + + @Test + public void testBoundedNodeStats() { + String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED"; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamIOSourceRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertEquals(5d, estimate.getRowCount(), 0.01); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + Assert.assertEquals(5d, estimate.getWindow(), 0.01); + } + + @Test + public void testUnboundedNodeStats() { + String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED"; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamIOSourceRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertEquals(0d, estimate.getRowCount(), 0.01); + Assert.assertEquals(2d, estimate.getRate(), 0.01); + Assert.assertEquals(BeamIOSourceRel.CONSTANT_WINDOW_SIZE, estimate.getWindow(), 0.01); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java index 1bcbed4..2b58272 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java @@ -19,12 +19,15 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.math.BigDecimal; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -121,4 +124,28 @@ public class BeamIntersectRelTest extends BaseRelTest { pipeline.run(); } + + @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT order_id, site_id, price " + + " FROM ORDER_DETAILS1 " + + " INTERSECT " + + " SELECT order_id, site_id, price " + + " FROM ORDER_DETAILS2 "; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamIntersectRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + + Assert.assertEquals(3. / 2., estimate.getRowCount(), 0.01); + Assert.assertEquals(3. / 2., estimate.getWindow(), 0.01); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java index d8e8a61..f208ecc 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -18,13 +18,16 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; import org.hamcrest.core.StringContains; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -82,6 +85,78 @@ public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest { } @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT * " + + " FROM ORDER_DETAILS1 o1 " + + " JOIN ORDER_DETAILS2 o2 " + + " on " + + " o1.order_id=o2.site_id "; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamJoinRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + NodeStats leftEstimate = + BeamSqlRelUtils.getNodeStats( + ((BeamJoinRel) root).getLeft(), root.getCluster().getMetadataQuery()); + NodeStats rightEstimate = + BeamSqlRelUtils.getNodeStats( + ((BeamJoinRel) root).getRight(), root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + + Assert.assertNotEquals(0d, estimate.getRowCount(), 0.001); + Assert.assertTrue( + estimate.getRowCount() < leftEstimate.getRowCount() * rightEstimate.getRowCount()); + + Assert.assertNotEquals(0d, estimate.getWindow(), 0.001); + Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() * rightEstimate.getWindow()); + } + + @Test + public void testNodeStatsOfMoreConditions() { + String sql1 = + "SELECT * " + + " FROM ORDER_DETAILS1 o1 " + + " JOIN ORDER_DETAILS2 o2 " + + " on " + + " o1.order_id=o2.site_id "; + + String sql2 = + "SELECT * " + + " FROM ORDER_DETAILS1 o1 " + + " JOIN ORDER_DETAILS2 o2 " + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; + + RelNode root1 = env.parseQuery(sql1); + + while (!(root1 instanceof BeamJoinRel)) { + root1 = root1.getInput(0); + } + + RelNode root2 = env.parseQuery(sql2); + + while (!(root2 instanceof BeamJoinRel)) { + root2 = root2.getInput(0); + } + + NodeStats estimate1 = + BeamSqlRelUtils.getNodeStats(root1, root1.getCluster().getMetadataQuery()); + NodeStats estimate2 = + BeamSqlRelUtils.getNodeStats(root2, root1.getCluster().getMetadataQuery()); + + Assert.assertNotEquals(0d, estimate2.getRowCount(), 0.001); + // A join with two conditions should have lower estimate. + Assert.assertTrue(estimate2.getRowCount() < estimate1.getRowCount()); + } + + @Test public void testLeftOuterJoin() throws Exception { String sql = "SELECT * " diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java index 190b091..e1a5d8b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; @@ -34,8 +35,10 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; import org.joda.time.DateTime; import org.joda.time.Duration; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -178,6 +181,44 @@ public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest { } @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id"; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamJoinRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + NodeStats leftEstimate = + BeamSqlRelUtils.getNodeStats( + ((BeamJoinRel) root).getLeft(), root.getCluster().getMetadataQuery()); + NodeStats rightEstimate = + BeamSqlRelUtils.getNodeStats( + ((BeamJoinRel) root).getRight(), root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRowCount(), 0.01); + + Assert.assertNotEquals(0d, estimate.getRate(), 0.001); + Assert.assertTrue( + estimate.getRate() + < leftEstimate.getRowCount() * rightEstimate.getWindow() + + rightEstimate.getRowCount() * leftEstimate.getWindow()); + + Assert.assertNotEquals(0d, estimate.getWindow(), 0.001); + Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() * rightEstimate.getWindow()); + } + + @Test public void testLeftOuterJoin() throws Exception { String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java index 869b1b8..1f19cf8 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; import org.apache.beam.sdk.schemas.Schema; @@ -26,8 +28,10 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; import org.joda.time.DateTime; import org.joda.time.Duration; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -35,8 +39,8 @@ import org.junit.Test; /** Unbounded + Unbounded Test for {@code BeamJoinRel}. */ public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - public static final DateTime FIRST_DATE = new DateTime(1); - public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); + private static final DateTime FIRST_DATE = new DateTime(1); + private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); private static final Duration WINDOW_SIZE = Duration.standardHours(1); @@ -72,7 +76,8 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest { 2, 3, 3, - SECOND_DATE)); + SECOND_DATE) + .setStatistics(BeamTableStatistics.createUnboundedTableStatistics(3d))); } @Test @@ -103,6 +108,45 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest { } @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id"; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamJoinRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + NodeStats leftEstimate = + BeamSqlRelUtils.getNodeStats( + ((BeamJoinRel) root).getLeft(), root.getCluster().getMetadataQuery()); + NodeStats rightEstimate = + BeamSqlRelUtils.getNodeStats( + ((BeamJoinRel) root).getRight(), root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRowCount(), 0.01); + + Assert.assertNotEquals(0d, estimate.getRate(), 0.001); + Assert.assertTrue( + estimate.getRate() + < leftEstimate.getRate() * rightEstimate.getWindow() + + rightEstimate.getRate() * leftEstimate.getWindow()); + + Assert.assertNotEquals(0d, estimate.getWindow(), 0.001); + Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() * rightEstimate.getWindow()); + } + + @Test public void testLeftOuterJoin() throws Exception { String sql = "SELECT * FROM " diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java index 6484360..c29eeb2 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java @@ -19,12 +19,19 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.math.BigDecimal; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; +import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -33,6 +40,11 @@ import org.junit.Test; public class BeamMinusRelTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); + private static final DateTime FIRST_DATE = new DateTime(1); + private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); + + private static final Duration WINDOW_SIZE = Duration.standardHours(1); + @BeforeClass public static void prepare() { registerTable( @@ -74,10 +86,43 @@ public class BeamMinusRelTest extends BaseRelTest { 3L, 3, new BigDecimal(3.0))); + + registerTable( + "ORDER_DETAILS_UNBOUNDED", + TestUnboundedTable.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.INT32, "site_id", + Schema.FieldType.INT32, "price", + Schema.FieldType.DATETIME, "order_time") + .timestampColumnIndex(3) + .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE) + .addRows( + WINDOW_SIZE.plus(Duration.standardMinutes(1)), + 2, + 2, + 7, + SECOND_DATE, + 2, + 3, + 8, + SECOND_DATE, + // this late record is omitted(First window) + 1, + 3, + 3, + FIRST_DATE) + .addRows( + // this late record is omitted(Second window) + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)), + 2, + 3, + 3, + SECOND_DATE) + .setStatistics(BeamTableStatistics.createUnboundedTableStatistics(4d))); } @Test - public void testExcept() throws Exception { + public void testExcept() { String sql = ""; sql += "SELECT order_id, site_id, price " @@ -100,7 +145,7 @@ public class BeamMinusRelTest extends BaseRelTest { } @Test - public void testExceptAll() throws Exception { + public void testExceptAll() { String sql = ""; sql += "SELECT order_id, site_id, price " @@ -134,7 +179,7 @@ public class BeamMinusRelTest extends BaseRelTest { } @Test - public void testExceptRemovesDuplicates() throws Exception { + public void testExceptRemovesDuplicates() { String sql = "(SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 1) EXCEPT SELECT 1"; PCollection<Row> rows = compilePipeline(sql, pipeline); @@ -146,4 +191,52 @@ public class BeamMinusRelTest extends BaseRelTest { pipeline.run(); } + + @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamMinusRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + + Assert.assertEquals(5. - 3. / 2., estimate.getRowCount(), 0.01); + Assert.assertEquals(5. - 3. / 2., estimate.getWindow(), 0.01); + } + + @Test + public void testNodeStatsEstimationUnbounded() { + String sql = + "SELECT * " + + "FROM " + + "(select order_id FROM ORDER_DETAILS_UNBOUNDED " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " EXCEPT ALL " + + " select order_id FROM ORDER_DETAILS_UNBOUNDED " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR) "; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamMinusRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + // note that we have group by + Assert.assertEquals(4d / 2 - 4d / 4, estimate.getRate(), 0.01); + Assert.assertEquals(0d, estimate.getRowCount(), 0.01); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java index 3e058a2..15cf8cb 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java @@ -18,13 +18,16 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; import org.joda.time.DateTime; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -102,7 +105,7 @@ public class BeamSortRelTest extends BaseRelTest { } @Test - public void testOrderBy_basic() throws Exception { + public void testOrderBy_basic() { String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " @@ -122,7 +125,7 @@ public class BeamSortRelTest extends BaseRelTest { } @Test - public void testOrderBy_timestamp() throws Exception { + public void testOrderBy_timestamp() { String sql = "SELECT order_id, site_id, price, order_time " + "FROM ORDER_DETAILS " @@ -158,7 +161,7 @@ public class BeamSortRelTest extends BaseRelTest { } @Test - public void testOrderBy_nullsFirst() throws Exception { + public void testOrderBy_nullsFirst() { Schema schema = Schema.builder() .addField("order_id", Schema.FieldType.INT64) @@ -188,7 +191,7 @@ public class BeamSortRelTest extends BaseRelTest { } @Test - public void testOrderBy_nullsLast() throws Exception { + public void testOrderBy_nullsLast() { Schema schema = Schema.builder() .addField("order_id", Schema.FieldType.INT64) @@ -218,7 +221,7 @@ public class BeamSortRelTest extends BaseRelTest { } @Test - public void testOrderBy_with_offset2() throws Exception { + public void testOrderBy_with_offset2() { Schema schema = Schema.builder().addField("count_star", Schema.FieldType.INT64).build(); String sql = @@ -232,7 +235,7 @@ public class BeamSortRelTest extends BaseRelTest { } @Test - public void testOrderBy_with_offset() throws Exception { + public void testOrderBy_with_offset() { String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " @@ -252,7 +255,7 @@ public class BeamSortRelTest extends BaseRelTest { } @Test - public void testOrderBy_bigFetch() throws Exception { + public void testOrderBy_bigFetch() { String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " @@ -288,4 +291,26 @@ public class BeamSortRelTest extends BaseRelTest { TestPipeline pipeline = TestPipeline.create(); compilePipeline(sql, pipeline); } + + @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT order_id, site_id, price, order_time " + + "FROM ORDER_DETAILS " + + "ORDER BY order_time asc limit 11"; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamSortRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + + Assert.assertEquals(10., estimate.getRowCount(), 0.01); + Assert.assertEquals(10., estimate.getWindow(), 0.01); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java new file mode 100644 index 0000000..2992c72 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.calcite.rel.RelNode; +import org.junit.Assert; +import org.junit.Test; + +/** Tests for {@code BeamUncollectRel}. */ +public class BeamUncollectRelTest extends BaseRelTest { + private NodeStats getEstimateOf(String sql) { + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamUncollectRel)) { + root = root.getInput(0); + } + + return BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + } + + @Test + public void testNodeStats() { + NodeStats estimate = + getEstimateOf( + "SELECT * FROM UNNEST (SELECT * FROM (VALUES (ARRAY ['a', 'b', 'c']),(ARRAY ['a', 'b', 'c']))) t1"); + + Assert.assertEquals(4d, estimate.getRowCount(), 0.001); + Assert.assertEquals(4d, estimate.getWindow(), 0.001); + Assert.assertEquals(0., estimate.getRate(), 0.001); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java index b7ee1a8..3ed476d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java @@ -19,12 +19,15 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.math.BigDecimal; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -99,4 +102,54 @@ public class BeamUnionRelTest extends BaseRelTest { .getRows()); pipeline.run(); } + + @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + " UNION SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS "; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamUnionRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + + Assert.assertEquals(2., estimate.getRowCount(), 0.01); + Assert.assertEquals(2., estimate.getWindow(), 0.01); + } + + @Test + public void testNodeStatsEstimationUnionAll() { + String sql = + "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + " UNION ALL SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS "; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamUnionRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + + Assert.assertEquals(4., estimate.getRowCount(), 0.01); + Assert.assertEquals(4., estimate.getWindow(), 0.01); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java index 0dc8e26..065b558 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java @@ -18,12 +18,15 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.calcite.rel.RelNode; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -104,4 +107,25 @@ public class BeamValuesRelTest extends BaseRelTest { .getRows()); pipeline.run(); } + + @Test + public void testNodeStatsEstimation() { + String sql = + "SELECT * FROM (VALUES ('value1'),('value2'),('value3'),('value4'),('value5')," + + " ('value6'),('value7'),('value8'),('value9'))"; + + RelNode root = env.parseQuery(sql); + + while (!(root instanceof BeamValuesRel)) { + root = root.getInput(0); + } + + NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, root.getCluster().getMetadataQuery()); + + Assert.assertFalse(estimate.isUnknown()); + Assert.assertEquals(0d, estimate.getRate(), 0.01); + + Assert.assertEquals(9., estimate.getRowCount(), 0.01); + Assert.assertEquals(9., estimate.getWindow(), 0.01); + } }