This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ca9149  [BEAM-7844] Implementing NodeStat Estimations for all the 
nodes
     new 7d56a23  Merge pull request #9198 from riazela/RowRateWindowEstimation
3ca9149 is described below

commit 3ca91490e0644ab89e5febc9d139402877f939e1
Author: Alireza Samadian <alireza4...@gmail.com>
AuthorDate: Thu Aug 1 09:02:11 2019 -0700

    [BEAM-7844] Implementing NodeStat Estimations for all the nodes
---
 .../sql/impl/planner/NodeStatsMetadata.java        |  4 +-
 .../sql/impl/rel/BeamAggregationRel.java           | 39 ++++++++-
 .../sdk/extensions/sql/impl/rel/BeamCalcRel.java   | 22 ++++-
 .../sdk/extensions/sql/impl/rel/BeamIOSinkRel.java |  2 +-
 .../extensions/sql/impl/rel/BeamIOSourceRel.java   |  9 +-
 .../extensions/sql/impl/rel/BeamIntersectRel.java  | 14 ++-
 .../sdk/extensions/sql/impl/rel/BeamJoinRel.java   | 26 ++++--
 .../sdk/extensions/sql/impl/rel/BeamMinusRel.java  |  8 +-
 .../sdk/extensions/sql/impl/rel/BeamSortRel.java   |  3 +-
 .../extensions/sql/impl/rel/BeamSqlRelUtils.java   | 19 +++++
 .../extensions/sql/impl/rel/BeamUncollectRel.java  |  5 +-
 .../sdk/extensions/sql/impl/rel/BeamUnionRel.java  | 10 ++-
 .../sdk/extensions/sql/impl/rel/BeamUnnestRel.java |  5 +-
 .../sdk/extensions/sql/impl/rel/BeamValuesRel.java |  2 +-
 .../extensions/sql/impl/planner/NodeStatsTest.java | 15 ++++
 ...rceRelTest.java => BeamAggregationRelTest.java} | 70 ++++++++++-----
 ...amIOSourceRelTest.java => BeamCalcRelTest.java} | 77 ++++++++++++-----
 .../sql/impl/rel/BeamIOSourceRelTest.java          | 43 +++++++++-
 .../sql/impl/rel/BeamIntersectRelTest.java         | 27 ++++++
 .../impl/rel/BeamJoinRelBoundedVsBoundedTest.java  | 75 ++++++++++++++++
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java     | 41 +++++++++
 .../rel/BeamJoinRelUnboundedVsUnboundedTest.java   | 50 ++++++++++-
 .../extensions/sql/impl/rel/BeamMinusRelTest.java  | 99 +++++++++++++++++++++-
 .../extensions/sql/impl/rel/BeamSortRelTest.java   | 39 +++++++--
 .../sql/impl/rel/BeamUncollectRelTest.java         | 47 ++++++++++
 .../extensions/sql/impl/rel/BeamUnionRelTest.java  | 53 ++++++++++++
 .../extensions/sql/impl/rel/BeamValuesRelTest.java | 24 ++++++
 27 files changed, 745 insertions(+), 83 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
index 4a9e79f..8bc62ee 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java
@@ -35,8 +35,8 @@ public interface NodeStatsMetadata extends Metadata {
   MetadataDef<NodeStatsMetadata> DEF =
       MetadataDef.of(NodeStatsMetadata.class, NodeStatsMetadata.Handler.class, 
METHOD);
 
-  // In order to use this we need to call it by 
relNode.metadata(RowRateWindowMetadata.class,
-  // mq).getRowRateWindow() where mq is the MetadataQuery (can be obtained by
+  // In order to use this we need to call it by 
relNode.metadata(NodeStatsMetadata.class,
+  // mq).getNodeStats() where mq is the MetadataQuery (can be obtained by
   // relNode.getCluster().getMetadataQuery()). After this, Calcite looks for 
the implementation of
   // this metadata that we have registered in MetadataProvider (it is 
RelMdNodeStats.class in
   // this case and we have registered it in CalciteQueryPlanner). Then 
Calcite's generated Code
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 14e7475..4e5978e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -84,7 +84,44 @@ public class BeamAggregationRel extends Aggregate implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+
+    NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(this.input, mq);
+
+    inputEstimate = computeWindowingCostEffect(inputEstimate);
+
+    NodeStats estimate;
+    // groupCount shows how many columns do we have in group by. One of them 
might be the windowing.
+    int groupCount = groupSet.cardinality() - (windowFn == null ? 0 : 1);
+    // This is similar to what Calcite does.If groupCount is zero then then we 
have only one value
+    // per window for unbounded and we have only one value for bounded. e.g 
select count(*) from A
+    // If group count is none zero then more column we include in the group 
by, more rows will be
+    // preserved.
+    return (groupCount == 0)
+        ? NodeStats.create(
+            Math.min(inputEstimate.getRowCount(), 1d),
+            inputEstimate.getRate() / inputEstimate.getWindow(),
+            1d)
+        : inputEstimate.multiply(1.0 - Math.pow(.5, groupCount));
+  }
+
+  private NodeStats computeWindowingCostEffect(NodeStats inputStat) {
+    if (windowFn == null) {
+      return inputStat;
+    }
+    WindowFn w = windowFn;
+    double multiplicationFactor = 1;
+    // If the window is SlidingWindow, the number of tuples will increase. 
(Because, some of the
+    // tuples repeat in multiple windows).
+    if (w instanceof SlidingWindows) {
+      multiplicationFactor =
+          ((double) ((SlidingWindows) w).getSize().getStandardSeconds())
+              / ((SlidingWindows) w).getPeriod().getStandardSeconds();
+    }
+
+    return NodeStats.create(
+        inputStat.getRowCount() * multiplicationFactor,
+        inputStat.getRate() * multiplicationFactor,
+        BeamIOSourceRel.CONSTANT_WINDOW_SIZE);
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index 31ae94a..755aaec 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -73,6 +73,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Calc;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexUtil;
@@ -216,7 +218,25 @@ public class BeamCalcRel extends Calc implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    double selectivity = estimateFilterSelectivity(getInput(), program, mq);
+
+    return inputStat.multiply(selectivity);
+  }
+
+  private static double estimateFilterSelectivity(
+      RelNode child, RexProgram program, RelMetadataQuery mq) {
+    // Similar to calcite, if the calc node is representing filter operation 
we estimate the filter
+    // selectivity based on the number of equality conditions, number of 
inequality conditions, ....
+    RexLocalRef programCondition = program.getCondition();
+    RexNode condition;
+    if (programCondition == null) {
+      condition = null;
+    } else {
+      condition = program.expandLocalRef(programCondition);
+    }
+    // Currently this gets the selectivity based on Calcite's Selectivity 
Handler (RelMdSelectivity)
+    return mq.getSelectivity(child, condition);
   }
 
   public boolean isInputSortRelAndLimitOnly() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 5aaa635..5738e44 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -74,7 +74,7 @@ public class BeamIOSinkRel extends TableModify
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    return BeamSqlRelUtils.getNodeStats(this.input, mq);
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index d87f152..6305924 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -37,7 +37,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /** BeamRelNode to replace a {@code TableScan} node. */
 public class BeamIOSourceRel extends TableScan implements BeamRelNode {
-
+  public static final double CONSTANT_WINDOW_SIZE = 10d;
   private final BeamSqlTable beamTable;
   private final BeamCalciteTable calciteTable;
   private final Map<String, String> pipelineOptions;
@@ -66,7 +66,12 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    BeamTableStatistics rowCountStatistics = calciteTable.getStatistic();
+    double window =
+        (beamTable.isBounded() == PCollection.IsBounded.BOUNDED)
+            ? rowCountStatistics.getRowCount()
+            : CONSTANT_WINDOW_SIZE;
+    return NodeStats.create(rowCountStatistics.getRowCount(), 
rowCountStatistics.getRate(), window);
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index a90972a..7fcc6c2 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -54,6 +54,18 @@ public class BeamIntersectRel extends Intersect implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    // This takes the minimum of the inputs for all the estimate factors.
+    double minimumRows = Double.POSITIVE_INFINITY;
+    double minimumWindowSize = Double.POSITIVE_INFINITY;
+    double minimumRate = Double.POSITIVE_INFINITY;
+
+    for (RelNode input : inputs) {
+      NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(input, mq);
+      minimumRows = Math.min(minimumRows, inputEstimates.getRowCount());
+      minimumRate = Math.min(minimumRate, inputEstimates.getRate());
+      minimumWindowSize = Math.min(minimumWindowSize, 
inputEstimates.getWindow());
+    }
+
+    return NodeStats.create(minimumRows, minimumRate, 
minimumWindowSize).multiply(0.5);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 167b8a5..4ec6b76 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -57,8 +57,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
@@ -138,13 +136,25 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
   }
 
   @Override
-  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
-    return super.computeSelfCost(planner, mq);
-  }
-
-  @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    double selectivity = mq.getSelectivity(this, getCondition());
+    NodeStats leftEstimates = BeamSqlRelUtils.getNodeStats(this.left, mq);
+    NodeStats rightEstimates = BeamSqlRelUtils.getNodeStats(this.right, mq);
+
+    if (leftEstimates.isUnknown() || rightEstimates.isUnknown()) {
+      return NodeStats.UNKNOWN;
+    }
+    // If any of the inputs are unbounded row count becomes zero (one of them 
would be zero)
+    // If one is bounded and one unbounded the rate will be window of the 
bounded (= its row count)
+    // multiplied by the rate of the unbounded one
+    // If both are unbounded, the rate will be multiplication of each rate 
into the window of the
+    // other.
+    return NodeStats.create(
+        leftEstimates.getRowCount() * rightEstimates.getRowCount() * 
selectivity,
+        (leftEstimates.getRate() * rightEstimates.getWindow()
+                + rightEstimates.getRate() * leftEstimates.getWindow())
+            * selectivity,
+        leftEstimates.getWindow() * rightEstimates.getWindow() * selectivity);
   }
 
   /**
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index 0a8103d..416c86b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -54,6 +54,12 @@ public class BeamMinusRel extends Minus implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    NodeStats firstInputEstimates = 
BeamSqlRelUtils.getNodeStats(inputs.get(0), mq);
+    // The first input minus half of the others. (We are assuming half of them 
have intersection)
+    for (int i = 1; i < inputs.size(); i++) {
+      NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(inputs.get(i), 
mq);
+      firstInputEstimates = 
firstInputEstimates.minus(inputEstimate.multiply(0.5));
+    }
+    return firstInputEstimates;
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index f252f11..61f7858 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -138,7 +138,8 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    // Sorting does not change rate or row count of the input.
+    return BeamSqlRelUtils.getNodeStats(this.input, mq);
   }
 
   public boolean isLimitOnly() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
index 9f0fd55..fb44f28 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
@@ -22,12 +22,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStatsMetadata;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 
 /** Utilities for {@code BeamRelNode}. */
 public class BeamSqlRelUtils {
@@ -76,4 +79,20 @@ public class BeamSqlRelUtils {
     }
     return (BeamRelNode) input;
   }
+
+  public static RelNode getInput(RelNode input) {
+    RelNode result = input;
+    if (input instanceof RelSubset) {
+      // go with known best input
+      result = ((RelSubset) input).getBest();
+      result = result == null ? ((RelSubset) input).getOriginal() : result;
+    }
+
+    return result;
+  }
+
+  public static NodeStats getNodeStats(RelNode input, RelMetadataQuery mq) {
+    input = getInput(input);
+    return input.metadata(NodeStatsMetadata.class, mq).getNodeStats();
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
index 28d2d69..7bca5c4 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java
@@ -76,7 +76,10 @@ public class BeamUncollectRel extends Uncollect implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    // We estimate the average length of each array by a constant.
+    // We might be able to get an estimate of the length by making a 
MetadataHandler for this
+    // purpose, and get the estimate by reading the first couple of the rows 
in the source.
+    return BeamSqlRelUtils.getNodeStats(this.input, mq).multiply(2);
   }
 
   private static class UncollectDoFn extends DoFn<Row, Row> {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index 95a1826..b4a6670 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -81,6 +81,14 @@ public class BeamUnionRel extends Union implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    // The summation of the input stats
+    NodeStats summationOfEstimates =
+        inputs.stream()
+            .map(input -> BeamSqlRelUtils.getNodeStats(input, mq))
+            .reduce(NodeStats.create(0, 0, 0), NodeStats::plus);
+    // If all is set then we propagate duplicated values. Otherwise we assume 
a constant factor of
+    // them are duplicate.
+    summationOfEstimates = all ? summationOfEstimates : 
summationOfEstimates.multiply(0.5);
+    return summationOfEstimates;
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
index b51df06..1687dec 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java
@@ -78,7 +78,10 @@ public class BeamUnnestRel extends Uncollect implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    // We estimate the average length of each array by a constant.
+    // We might be able to get an estimate of the length by making a 
MetadataHandler for this
+    // purpose, and get the estimate by reading the first couple of the rows 
in the source.
+    return BeamSqlRelUtils.getNodeStats(this.input, mq).multiply(2);
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index c4405ba..1ad51b6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -96,6 +96,6 @@ public class BeamValuesRel extends Values implements 
BeamRelNode {
 
   @Override
   public NodeStats estimateNodeStats(RelMetadataQuery mq) {
-    return NodeStats.create(mq.getRowCount(this));
+    return NodeStats.create(tuples.size(), 0, tuples.size());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
index 820f4fc..10e0b61 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java
@@ -18,10 +18,12 @@
 package org.apache.beam.sdk.extensions.sql.impl.planner;
 
 import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.junit.Assert;
@@ -76,4 +78,17 @@ public class NodeStatsTest extends BaseRelTest {
         root.metadata(NodeStatsMetadata.class, 
root.getCluster().getMetadataQuery()).getNodeStats();
     Assert.assertFalse(nodeStats.isUnknown());
   }
+
+  @Test
+  public void testSubsetHavingBest() {
+    String sql = " select * from ORDER_DETAILS1 ";
+    RelNode root = env.parseQuery(sql);
+    root = root.getCluster().getPlanner().getRoot();
+
+    // tests if we are actually testing what we want.
+    Assert.assertTrue(root instanceof RelSubset);
+
+    NodeStats estimates = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+    Assert.assertFalse(estimates.isUnknown());
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java
similarity index 60%
copy from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
copy to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java
index 3da71e0..df9305b 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRelTest.java
@@ -19,25 +19,21 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.math.BigDecimal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
 
-/** Test for {@code BeamIOSourceRel}. */
-public class BeamIOSourceRelTest extends BaseRelTest {
-  @Rule public final TestPipeline pipeline = TestPipeline.create();
-
-  public static final DateTime FIRST_DATE = new DateTime(1);
-  public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+/** Tests related to {@code BeamAggregationRel}. */
+public class BeamAggregationRelTest extends BaseRelTest {
+  private static final DateTime FIRST_DATE = new DateTime(1);
+  private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
 
   private static final Duration WINDOW_SIZE = Duration.standardHours(1);
 
@@ -100,29 +96,57 @@ public class BeamIOSourceRelTest extends BaseRelTest {
             
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(2d)));
   }
 
-  @Test
-  public void boundedRowCount() {
-    String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
-
+  private NodeStats getEstimateOf(String sql) {
     RelNode root = env.parseQuery(sql);
 
-    while (!(root instanceof BeamIOSourceRel)) {
-      root = env.parseQuery(sql).getInput(0);
+    while (!(root instanceof BeamAggregationRel)) {
+      root = root.getInput(0);
     }
 
-    Assert.assertEquals(5d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+    return BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
   }
 
   @Test
-  public void unboundedRowCount() {
-    String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+  public void testNodeStats() {
+    String sql = "SELECT order_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY 
order_id ";
 
-    RelNode root = env.parseQuery(sql);
+    NodeStats estimate = getEstimateOf(sql);
 
-    while (!(root instanceof BeamIOSourceRel)) {
-      root = env.parseQuery(sql).getInput(0);
-    }
+    Assert.assertEquals(5d / 2, estimate.getRowCount(), 0.001);
+    Assert.assertEquals(5d / 2, estimate.getWindow(), 0.001);
+    Assert.assertEquals(0., estimate.getRate(), 0.001);
+  }
+
+  @Test
+  public void testNodeStatsEffectOfGroupSet() {
+    String sql1 = "SELECT order_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY 
order_id ";
+    String sql2 =
+        "SELECT order_id, site_id FROM ORDER_DETAILS_BOUNDED " + " GROUP BY 
order_id, site_id ";
 
-    Assert.assertEquals(2d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+    NodeStats estimate1 = getEstimateOf(sql1);
+
+    NodeStats estimate2 = getEstimateOf(sql2);
+
+    Assert.assertTrue(estimate1.getRowCount() < estimate2.getRowCount());
+    Assert.assertTrue(estimate1.getWindow() < estimate2.getWindow());
+  }
+
+  @Test
+  public void testNodeStatsUnboundedWindow() {
+    String sql =
+        "select order_id, sum(site_id) as sum_site_id FROM 
ORDER_DETAILS_UNBOUNDED "
+            + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)";
+    NodeStats estimate1 = getEstimateOf(sql);
+    Assert.assertEquals(1d, estimate1.getRate(), 0.01);
+    Assert.assertEquals(BeamIOSourceRel.CONSTANT_WINDOW_SIZE / 2, 
estimate1.getWindow(), 0.01);
+  }
+
+  @Test
+  public void testNodeStatsSlidingWindow() {
+    String sql =
+        "select order_id, sum(site_id) as sum_site_id FROM 
ORDER_DETAILS_UNBOUNDED "
+            + " GROUP BY order_id, HOP(order_time, INTERVAL '1' 
SECOND,INTERVAL '3' SECOND)";
+    NodeStats estimate1 = getEstimateOf(sql);
+    Assert.assertEquals(3d, estimate1.getRate(), 0.01);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java
similarity index 55%
copy from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
copy to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java
index 3da71e0..ad64f0d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelTest.java
@@ -19,25 +19,21 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.math.BigDecimal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
 
-/** Test for {@code BeamIOSourceRel}. */
-public class BeamIOSourceRelTest extends BaseRelTest {
-  @Rule public final TestPipeline pipeline = TestPipeline.create();
-
-  public static final DateTime FIRST_DATE = new DateTime(1);
-  public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+/** Tests related to {@code BeamCalcRel}. */
+public class BeamCalcRelTest extends BaseRelTest {
+  private static final DateTime FIRST_DATE = new DateTime(1);
+  private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
 
   private static final Duration WINDOW_SIZE = Duration.standardHours(1);
 
@@ -101,28 +97,67 @@ public class BeamIOSourceRelTest extends BaseRelTest {
   }
 
   @Test
-  public void boundedRowCount() {
-    String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
+  public void testProjectionNodeStats() {
+    String sql = "SELECT order_id FROM ORDER_DETAILS_BOUNDED";
 
     RelNode root = env.parseQuery(sql);
 
-    while (!(root instanceof BeamIOSourceRel)) {
-      root = env.parseQuery(sql).getInput(0);
-    }
+    Assert.assertTrue(root instanceof BeamCalcRel);
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
 
-    Assert.assertEquals(5d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+    Assert.assertEquals(5d, estimate.getRowCount(), 0.001);
+    Assert.assertEquals(5d, estimate.getWindow(), 0.001);
+    Assert.assertEquals(0., estimate.getRate(), 0.001);
   }
 
   @Test
-  public void unboundedRowCount() {
-    String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+  public void testFilterNodeStats() {
+    String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1";
 
     RelNode root = env.parseQuery(sql);
 
-    while (!(root instanceof BeamIOSourceRel)) {
-      root = env.parseQuery(sql).getInput(0);
-    }
+    Assert.assertTrue(root instanceof BeamCalcRel);
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertTrue(5d > estimate.getRowCount());
+    Assert.assertTrue(5d > estimate.getWindow());
+    Assert.assertEquals(0., estimate.getRate(), 0.001);
+  }
+
+  @Test
+  public void testNodeStatsConditionType() {
+    String equalSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1";
+    String geqSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id>=1";
+
+    RelNode equalRoot = env.parseQuery(equalSql);
+    RelNode geqRoot = env.parseQuery(geqSql);
+
+    NodeStats equalEstimate =
+        BeamSqlRelUtils.getNodeStats(equalRoot, 
equalRoot.getCluster().getMetadataQuery());
+    NodeStats geqEstimate =
+        BeamSqlRelUtils.getNodeStats(geqRoot, 
geqRoot.getCluster().getMetadataQuery());
+
+    Assert.assertTrue(geqEstimate.getRowCount() > equalEstimate.getRowCount());
+    Assert.assertTrue(geqEstimate.getWindow() > equalEstimate.getWindow());
+  }
+
+  @Test
+  public void testNodeStatsNumberOfConditions() {
+    String equalSql = "SELECT * FROM ORDER_DETAILS_BOUNDED where order_id=1";
+    String doubleEqualSql = "SELECT * FROM ORDER_DETAILS_BOUNDED WHERE 
order_id=1 AND site_id=2 ";
+
+    RelNode equalRoot = env.parseQuery(equalSql);
+    RelNode doubleEqualRoot = env.parseQuery(doubleEqualSql);
+
+    NodeStats equalEstimate =
+        BeamSqlRelUtils.getNodeStats(equalRoot, 
equalRoot.getCluster().getMetadataQuery());
+    NodeStats doubleEqualEstimate =
+        BeamSqlRelUtils.getNodeStats(
+            doubleEqualRoot, doubleEqualRoot.getCluster().getMetadataQuery());
 
-    Assert.assertEquals(2d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
+    Assert.assertTrue(doubleEqualEstimate.getRowCount() < 
equalEstimate.getRowCount());
+    Assert.assertTrue(doubleEqualEstimate.getWindow() < 
equalEstimate.getWindow());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
index 3da71e0..22fb229 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRelTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.math.BigDecimal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
@@ -36,8 +37,8 @@ import org.junit.Test;
 public class BeamIOSourceRelTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
 
-  public static final DateTime FIRST_DATE = new DateTime(1);
-  public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+  private static final DateTime FIRST_DATE = new DateTime(1);
+  private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
 
   private static final Duration WINDOW_SIZE = Duration.standardHours(1);
 
@@ -107,7 +108,7 @@ public class BeamIOSourceRelTest extends BaseRelTest {
     RelNode root = env.parseQuery(sql);
 
     while (!(root instanceof BeamIOSourceRel)) {
-      root = env.parseQuery(sql).getInput(0);
+      root = root.getInput(0);
     }
 
     Assert.assertEquals(5d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
@@ -120,9 +121,43 @@ public class BeamIOSourceRelTest extends BaseRelTest {
     RelNode root = env.parseQuery(sql);
 
     while (!(root instanceof BeamIOSourceRel)) {
-      root = env.parseQuery(sql).getInput(0);
+      root = root.getInput(0);
     }
 
     Assert.assertEquals(2d, 
root.estimateRowCount(RelMetadataQuery.instance()), 0.001);
   }
+
+  @Test
+  public void testBoundedNodeStats() {
+    String sql = "SELECT * FROM ORDER_DETAILS_BOUNDED";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamIOSourceRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertEquals(5d, estimate.getRowCount(), 0.01);
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+    Assert.assertEquals(5d, estimate.getWindow(), 0.01);
+  }
+
+  @Test
+  public void testUnboundedNodeStats() {
+    String sql = "SELECT * FROM ORDER_DETAILS_UNBOUNDED";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamIOSourceRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+    Assert.assertEquals(2d, estimate.getRate(), 0.01);
+    Assert.assertEquals(BeamIOSourceRel.CONSTANT_WINDOW_SIZE, 
estimate.getWindow(), 0.01);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index 1bcbed4..2b58272 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -19,12 +19,15 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.math.BigDecimal;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -121,4 +124,28 @@ public class BeamIntersectRelTest extends BaseRelTest {
 
     pipeline.run();
   }
+
+  @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT order_id, site_id, price "
+            + " FROM ORDER_DETAILS1 "
+            + " INTERSECT "
+            + " SELECT order_id, site_id, price "
+            + " FROM ORDER_DETAILS2 ";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamIntersectRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+    Assert.assertEquals(3. / 2., estimate.getRowCount(), 0.01);
+    Assert.assertEquals(3. / 2., estimate.getWindow(), 0.01);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index d8e8a61..f208ecc 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -18,13 +18,16 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
 import org.hamcrest.core.StringContains;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,6 +85,78 @@ public class BeamJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
   }
 
   @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT *  "
+            + " FROM ORDER_DETAILS1 o1 "
+            + " JOIN ORDER_DETAILS2 o2 "
+            + " on "
+            + " o1.order_id=o2.site_id ";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamJoinRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+    NodeStats leftEstimate =
+        BeamSqlRelUtils.getNodeStats(
+            ((BeamJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
+    NodeStats rightEstimate =
+        BeamSqlRelUtils.getNodeStats(
+            ((BeamJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+    Assert.assertNotEquals(0d, estimate.getRowCount(), 0.001);
+    Assert.assertTrue(
+        estimate.getRowCount() < leftEstimate.getRowCount() * 
rightEstimate.getRowCount());
+
+    Assert.assertNotEquals(0d, estimate.getWindow(), 0.001);
+    Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() * 
rightEstimate.getWindow());
+  }
+
+  @Test
+  public void testNodeStatsOfMoreConditions() {
+    String sql1 =
+        "SELECT *  "
+            + " FROM ORDER_DETAILS1 o1 "
+            + " JOIN ORDER_DETAILS2 o2 "
+            + " on "
+            + " o1.order_id=o2.site_id ";
+
+    String sql2 =
+        "SELECT *  "
+            + " FROM ORDER_DETAILS1 o1 "
+            + " JOIN ORDER_DETAILS2 o2 "
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
+
+    RelNode root1 = env.parseQuery(sql1);
+
+    while (!(root1 instanceof BeamJoinRel)) {
+      root1 = root1.getInput(0);
+    }
+
+    RelNode root2 = env.parseQuery(sql2);
+
+    while (!(root2 instanceof BeamJoinRel)) {
+      root2 = root2.getInput(0);
+    }
+
+    NodeStats estimate1 =
+        BeamSqlRelUtils.getNodeStats(root1, 
root1.getCluster().getMetadataQuery());
+    NodeStats estimate2 =
+        BeamSqlRelUtils.getNodeStats(root2, 
root1.getCluster().getMetadataQuery());
+
+    Assert.assertNotEquals(0d, estimate2.getRowCount(), 0.001);
+    // A join with two conditions should have lower estimate.
+    Assert.assertTrue(estimate2.getRowCount() < estimate1.getRowCount());
+  }
+
+  @Test
   public void testLeftOuterJoin() throws Exception {
     String sql =
         "SELECT *  "
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 190b091..e1a5d8b 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
@@ -34,8 +35,10 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -178,6 +181,44 @@ public class BeamJoinRelUnboundedVsBoundedTest extends 
BaseRelTest {
   }
 
   @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+            + "(select order_id, sum(site_id) as sum_site_id FROM 
ORDER_DETAILS "
+            + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' 
HOUR)) o1 "
+            + " JOIN "
+            + " ORDER_DETAILS1 o2 "
+            + " on "
+            + " o1.order_id=o2.order_id";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamJoinRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+    NodeStats leftEstimate =
+        BeamSqlRelUtils.getNodeStats(
+            ((BeamJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
+    NodeStats rightEstimate =
+        BeamSqlRelUtils.getNodeStats(
+            ((BeamJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+
+    Assert.assertNotEquals(0d, estimate.getRate(), 0.001);
+    Assert.assertTrue(
+        estimate.getRate()
+            < leftEstimate.getRowCount() * rightEstimate.getWindow()
+                + rightEstimate.getRowCount() * leftEstimate.getWindow());
+
+    Assert.assertNotEquals(0d, estimate.getWindow(), 0.001);
+    Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() * 
rightEstimate.getWindow());
+  }
+
+  @Test
   public void testLeftOuterJoin() throws Exception {
     String sql =
         "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 869b1b8..1f19cf8 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
@@ -26,8 +28,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,8 +39,8 @@ import org.junit.Test;
 /** Unbounded + Unbounded Test for {@code BeamJoinRel}. */
 public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
-  public static final DateTime FIRST_DATE = new DateTime(1);
-  public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+  private static final DateTime FIRST_DATE = new DateTime(1);
+  private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
 
   private static final Duration WINDOW_SIZE = Duration.standardHours(1);
 
@@ -72,7 +76,8 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends 
BaseRelTest {
                 2,
                 3,
                 3,
-                SECOND_DATE));
+                SECOND_DATE)
+            
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(3d)));
   }
 
   @Test
@@ -103,6 +108,45 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends 
BaseRelTest {
   }
 
   @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT * FROM "
+            + "(select order_id, sum(site_id) as sum_site_id FROM 
ORDER_DETAILS "
+            + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' 
HOUR)) o1 "
+            + " JOIN "
+            + "(select order_id, sum(site_id) as sum_site_id FROM 
ORDER_DETAILS "
+            + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' 
HOUR)) o2 "
+            + " on "
+            + " o1.order_id=o2.order_id";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamJoinRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+    NodeStats leftEstimate =
+        BeamSqlRelUtils.getNodeStats(
+            ((BeamJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
+    NodeStats rightEstimate =
+        BeamSqlRelUtils.getNodeStats(
+            ((BeamJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+
+    Assert.assertNotEquals(0d, estimate.getRate(), 0.001);
+    Assert.assertTrue(
+        estimate.getRate()
+            < leftEstimate.getRate() * rightEstimate.getWindow()
+                + rightEstimate.getRate() * leftEstimate.getWindow());
+
+    Assert.assertNotEquals(0d, estimate.getWindow(), 0.001);
+    Assert.assertTrue(estimate.getWindow() < leftEstimate.getWindow() * 
rightEstimate.getWindow());
+  }
+
+  @Test
   public void testLeftOuterJoin() throws Exception {
     String sql =
         "SELECT * FROM "
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 6484360..c29eeb2 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -19,12 +19,19 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.math.BigDecimal;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -33,6 +40,11 @@ import org.junit.Test;
 public class BeamMinusRelTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
 
+  private static final DateTime FIRST_DATE = new DateTime(1);
+  private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
+
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
   @BeforeClass
   public static void prepare() {
     registerTable(
@@ -74,10 +86,43 @@ public class BeamMinusRelTest extends BaseRelTest {
                 3L,
                 3,
                 new BigDecimal(3.0)));
+
+    registerTable(
+        "ORDER_DETAILS_UNBOUNDED",
+        TestUnboundedTable.of(
+                Schema.FieldType.INT32, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.INT32, "price",
+                Schema.FieldType.DATETIME, "order_time")
+            .timestampColumnIndex(3)
+            .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)
+            .addRows(
+                WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+                2,
+                2,
+                7,
+                SECOND_DATE,
+                2,
+                3,
+                8,
+                SECOND_DATE,
+                // this late record is omitted(First window)
+                1,
+                3,
+                3,
+                FIRST_DATE)
+            .addRows(
+                // this late record is omitted(Second window)
+                
WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+                2,
+                3,
+                3,
+                SECOND_DATE)
+            
.setStatistics(BeamTableStatistics.createUnboundedTableStatistics(4d)));
   }
 
   @Test
-  public void testExcept() throws Exception {
+  public void testExcept() {
     String sql = "";
     sql +=
         "SELECT order_id, site_id, price "
@@ -100,7 +145,7 @@ public class BeamMinusRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testExceptAll() throws Exception {
+  public void testExceptAll() {
     String sql = "";
     sql +=
         "SELECT order_id, site_id, price "
@@ -134,7 +179,7 @@ public class BeamMinusRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testExceptRemovesDuplicates() throws Exception {
+  public void testExceptRemovesDuplicates() {
     String sql = "(SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 1) EXCEPT 
SELECT 1";
 
     PCollection<Row> rows = compilePipeline(sql, pipeline);
@@ -146,4 +191,52 @@ public class BeamMinusRelTest extends BaseRelTest {
 
     pipeline.run();
   }
+
+  @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT order_id, site_id, price "
+            + "FROM ORDER_DETAILS1 "
+            + " EXCEPT ALL "
+            + "SELECT order_id, site_id, price "
+            + "FROM ORDER_DETAILS2 ";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamMinusRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+    Assert.assertEquals(5. - 3. / 2., estimate.getRowCount(), 0.01);
+    Assert.assertEquals(5. - 3. / 2., estimate.getWindow(), 0.01);
+  }
+
+  @Test
+  public void testNodeStatsEstimationUnbounded() {
+    String sql =
+        "SELECT * "
+            + "FROM "
+            + "(select order_id FROM ORDER_DETAILS_UNBOUNDED "
+            + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' 
HOUR)) o1 "
+            + " EXCEPT ALL "
+            + " select order_id FROM ORDER_DETAILS_UNBOUNDED "
+            + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' 
HOUR) ";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamMinusRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    // note that we have group by
+    Assert.assertEquals(4d / 2 - 4d / 4, estimate.getRate(), 0.01);
+    Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index 3e058a2..15cf8cb 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -18,13 +18,16 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
 import org.joda.time.DateTime;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -102,7 +105,7 @@ public class BeamSortRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testOrderBy_basic() throws Exception {
+  public void testOrderBy_basic() {
     String sql =
         "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
             + " order_id, site_id, price "
@@ -122,7 +125,7 @@ public class BeamSortRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testOrderBy_timestamp() throws Exception {
+  public void testOrderBy_timestamp() {
     String sql =
         "SELECT order_id, site_id, price, order_time "
             + "FROM ORDER_DETAILS "
@@ -158,7 +161,7 @@ public class BeamSortRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testOrderBy_nullsFirst() throws Exception {
+  public void testOrderBy_nullsFirst() {
     Schema schema =
         Schema.builder()
             .addField("order_id", Schema.FieldType.INT64)
@@ -188,7 +191,7 @@ public class BeamSortRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testOrderBy_nullsLast() throws Exception {
+  public void testOrderBy_nullsLast() {
     Schema schema =
         Schema.builder()
             .addField("order_id", Schema.FieldType.INT64)
@@ -218,7 +221,7 @@ public class BeamSortRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testOrderBy_with_offset2() throws Exception {
+  public void testOrderBy_with_offset2() {
     Schema schema = Schema.builder().addField("count_star", 
Schema.FieldType.INT64).build();
 
     String sql =
@@ -232,7 +235,7 @@ public class BeamSortRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testOrderBy_with_offset() throws Exception {
+  public void testOrderBy_with_offset() {
     String sql =
         "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
             + " order_id, site_id, price "
@@ -252,7 +255,7 @@ public class BeamSortRelTest extends BaseRelTest {
   }
 
   @Test
-  public void testOrderBy_bigFetch() throws Exception {
+  public void testOrderBy_bigFetch() {
     String sql =
         "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
             + " order_id, site_id, price "
@@ -288,4 +291,26 @@ public class BeamSortRelTest extends BaseRelTest {
     TestPipeline pipeline = TestPipeline.create();
     compilePipeline(sql, pipeline);
   }
+
+  @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT order_id, site_id, price, order_time "
+            + "FROM ORDER_DETAILS "
+            + "ORDER BY order_time asc limit 11";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamSortRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+    Assert.assertEquals(10., estimate.getRowCount(), 0.01);
+    Assert.assertEquals(10., estimate.getWindow(), 0.01);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java
new file mode 100644
index 0000000..2992c72
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRelTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests for {@code BeamUncollectRel}. */
+public class BeamUncollectRelTest extends BaseRelTest {
+  private NodeStats getEstimateOf(String sql) {
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamUncollectRel)) {
+      root = root.getInput(0);
+    }
+
+    return BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+  }
+
+  @Test
+  public void testNodeStats() {
+    NodeStats estimate =
+        getEstimateOf(
+            "SELECT * FROM UNNEST (SELECT * FROM (VALUES (ARRAY ['a', 'b', 
'c']),(ARRAY ['a', 'b', 'c']))) t1");
+
+    Assert.assertEquals(4d, estimate.getRowCount(), 0.001);
+    Assert.assertEquals(4d, estimate.getWindow(), 0.001);
+    Assert.assertEquals(0., estimate.getRate(), 0.001);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index b7ee1a8..3ed476d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -19,12 +19,15 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.math.BigDecimal;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -99,4 +102,54 @@ public class BeamUnionRelTest extends BaseRelTest {
                 .getRows());
     pipeline.run();
   }
+
+  @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT "
+            + " order_id, site_id, price "
+            + "FROM ORDER_DETAILS "
+            + " UNION SELECT "
+            + " order_id, site_id, price "
+            + "FROM ORDER_DETAILS ";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamUnionRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+    Assert.assertEquals(2., estimate.getRowCount(), 0.01);
+    Assert.assertEquals(2., estimate.getWindow(), 0.01);
+  }
+
+  @Test
+  public void testNodeStatsEstimationUnionAll() {
+    String sql =
+        "SELECT "
+            + " order_id, site_id, price "
+            + "FROM ORDER_DETAILS "
+            + " UNION ALL SELECT "
+            + " order_id, site_id, price "
+            + "FROM ORDER_DETAILS ";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamUnionRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+    Assert.assertEquals(4., estimate.getRowCount(), 0.01);
+    Assert.assertEquals(4., estimate.getWindow(), 0.01);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index 0dc8e26..065b558 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -18,12 +18,15 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -104,4 +107,25 @@ public class BeamValuesRelTest extends BaseRelTest {
                 .getRows());
     pipeline.run();
   }
+
+  @Test
+  public void testNodeStatsEstimation() {
+    String sql =
+        "SELECT * FROM (VALUES 
('value1'),('value2'),('value3'),('value4'),('value5'),"
+            + " ('value6'),('value7'),('value8'),('value9'))";
+
+    RelNode root = env.parseQuery(sql);
+
+    while (!(root instanceof BeamValuesRel)) {
+      root = root.getInput(0);
+    }
+
+    NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
+
+    Assert.assertFalse(estimate.isUnknown());
+    Assert.assertEquals(0d, estimate.getRate(), 0.01);
+
+    Assert.assertEquals(9., estimate.getRowCount(), 0.01);
+    Assert.assertEquals(9., estimate.getWindow(), 0.01);
+  }
 }

Reply via email to