thomasrebele commented on code in PR #6293:
URL: https://github.com/apache/hive/pull/6293#discussion_r2910922019


##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -605,7 +611,36 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
   }
 
   @Test
-  public void testRangePredicateCastInteger() {
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity

Review Comment:
   Method will get removed.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +605,428 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesOutsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES2, KLL2);
+    checkSelectivity(16 / 28.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateTypeMatrix() {
+    // checks many possible combinations of types
+    List<RelDataTypeField> fields = tableType.getFieldList();
+    for (var srcField : fields) {
+      if (isTemporal(srcField.getType())) {
+        continue;
+      }
+
+      useFieldWithValues(srcField.getName(), VALUES, KLL);
+
+      for (var tgt : fields) {
+        try {
+          if (isTemporal(tgt.getType())) {
+            continue;
+          }
+
+          RexNode expr = cast(srcField.getName(), tgt.getType());
+          checkBetweenSelectivity(3, VALUES.length, VALUES.length, expr, 5, 7);
+        } catch (AssertionError e) {
+          throw new AssertionError("Error when casting from " + 
srcField.getType() + " to " + tgt.getType(), e);
+        }
+      }
+    }
+  }
+
+  private boolean isTemporal(RelDataType type) {
+    return type.getSqlTypeName() == TIMESTAMP || type.getSqlTypeName() == 
SqlTypeName.DATE;
+  }

Review Comment:
   I'll drop it.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -605,7 +611,36 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
   }
 
   @Test
-  public void testRangePredicateCastInteger() {
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));

Review Comment:
   Ok, I'll combine them.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -226,43 +247,41 @@ private boolean isRemovableCast(RexNode exp, 
HiveTableScan tableScan) {
       return false;
     }
 
-    SqlTypeName type = cast.getType().getSqlTypeName();
 
-    double min;
-    double max;
-    switch (type) {
-    case TINYINT, SMALLINT, INTEGER, BIGINT:
-      min = ((Number) type.getLimit(false, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
-      max = ((Number) type.getLimit(true, SqlTypeName.Limit.OVERFLOW, false, 
-1, -1)).doubleValue();
-      break;
-    case TIMESTAMP, DATE:
-      min = Long.MIN_VALUE;
-      max = Long.MAX_VALUE;
-      break;
-    case FLOAT:
-      min = -Float.MAX_VALUE;
-      max = Float.MAX_VALUE;
-      break;
-    case DOUBLE, DECIMAL:
-      min = -Double.MAX_VALUE;
-      max = Double.MAX_VALUE;
-      break;
-    default:
-      // unknown type, do not remove the cast
-      return false;
-    }
     // are all values of the input column accepted by the cast?
+    double min = ((Number) targetType.getLimit(false, 
SqlTypeName.Limit.OVERFLOW, false, -1, -1)).doubleValue();
+    double max = ((Number) targetType.getLimit(true, 
SqlTypeName.Limit.OVERFLOW, false, -1, -1)).doubleValue();
     return min < colRange.minValue.doubleValue() && 
colRange.maxValue.doubleValue() < max;
   }
 
   /**
-   * Get the range of values that are rounded to valid values of a DECIMAL 
type.
+   * Get the range of values that are rounded to valid values of a type.
    *
-   * @param type the DECIMAL type
+   * @param type the type
    * @param lowerBound the lower bound type of the result
    * @param upperBound the upper bound type of the result
    * @return the range of the type
    */
+  private static Range<Float> getRangeOfType(RelDataType type, BoundType 
lowerBound, BoundType upperBound) {
+    switch (type.getSqlTypeName()) {
+    // in case of integer types,
+    case TINYINT:
+      return Range.closed(-128.99998f, 127.99999f);
+    case SMALLINT:
+      return Range.closed(-32768.996f, 32767.998f);
+    case INTEGER:
+      return Range.closed(-2.1474836E9f, 2.1474836E9f);
+    case BIGINT, DATE, TIMESTAMP:
+      return Range.closed(-9.223372E18f, 9.223372E18f);
+    case DECIMAL:
+      return getRangeOfDecimalType(type, lowerBound, upperBound);

Review Comment:
   Thanks for the clarification, see reply below.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -726,7 +761,85 @@ public void testRangePredicateOnDateWithCast() {
   }
 
   @Test
-  public void testBetweenWithCastDecimal2s1() {
+  public void testBetweenWithCastToTinyIntCheckRounding() {

Review Comment:
   I'll merge the two test methods.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +605,428 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesOutsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES2, KLL2);
+    checkSelectivity(16 / 28.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateTypeMatrix() {

Review Comment:
   I'll integrate testRangePredicateCastIntegerValuesInsideTypeRange and rename 
the method to testCastMatrix.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +605,428 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesOutsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES2, KLL2);
+    checkSelectivity(16 / 28.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateTypeMatrix() {
+    // checks many possible combinations of types
+    List<RelDataTypeField> fields = tableType.getFieldList();
+    for (var srcField : fields) {
+      if (isTemporal(srcField.getType())) {
+        continue;
+      }
+
+      useFieldWithValues(srcField.getName(), VALUES, KLL);
+
+      for (var tgt : fields) {
+        try {
+          if (isTemporal(tgt.getType())) {
+            continue;
+          }
+
+          RexNode expr = cast(srcField.getName(), tgt.getType());
+          checkBetweenSelectivity(3, VALUES.length, VALUES.length, expr, 5, 7);
+        } catch (AssertionError e) {
+          throw new AssertionError("Error when casting from " + 
srcField.getType() + " to " + tgt.getType(), e);
+        }
+      }
+    }
+  }
+
+  private boolean isTemporal(RelDataType type) {
+    return type.getSqlTypeName() == TIMESTAMP || type.getSqlTypeName() == 
SqlTypeName.DATE;
+  }
+
+  @Test
+  public void testRangePredicateWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SqlTypeName.FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SqlTypeName.DOUBLE), 
int5));

Review Comment:
   I'll integrate the whole method into the matrix test.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +189,383 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * Return whether the expression is a removable cast based on stats and type 
bounds.
+   *
+   * <p>
+   * In Hive, if a value cannot be represented by the cast, the result of the 
cast is NULL,
+   * and therefore cannot fulfill the predicate. So the possible range of the 
values
+   * is limited by the range of possible values of the type.
+   * </p>
+   *
+   * @param exp       the expression to check
+   * @param tableScan the table that provides the statistics
+   * @return true if the expression is a removable cast, false otherwise
+   */
+  private boolean isRemovableCast(RexNode exp, HiveTableScan tableScan) {
+    if(SqlKind.CAST != exp.getKind()) {
+      return false;
+    }
+    RexCall cast = (RexCall) exp;
+    RexNode op0 = cast.getOperands().getFirst();
+    if (!(op0 instanceof RexInputRef)) {
+      return false;
+    }
+    int index = ((RexInputRef) op0).getIndex();
+    final List<ColStatistics> colStats = 
tableScan.getColStat(Collections.singletonList(index));
+    if (colStats.isEmpty()) {
+      return false;
+    }

Review Comment:
   I'll move it.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +189,383 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * Return whether the expression is a removable cast based on stats and type 
bounds.
+   *
+   * <p>
+   * In Hive, if a value cannot be represented by the cast, the result of the 
cast is NULL,
+   * and therefore cannot fulfill the predicate. So the possible range of the 
values
+   * is limited by the range of possible values of the type.
+   * </p>

Review Comment:
   I'll change the javadoc to make the purpose of `isRemovableCast` clearer.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +605,428 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesOutsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES2, KLL2);
+    checkSelectivity(16 / 28.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateTypeMatrix() {
+    // checks many possible combinations of types
+    List<RelDataTypeField> fields = tableType.getFieldList();
+    for (var srcField : fields) {
+      if (isTemporal(srcField.getType())) {
+        continue;
+      }

Review Comment:
   I'll fix it.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/FilterSelectivityEstimator.java:
##########
@@ -184,91 +189,383 @@ public Double visitCall(RexCall call) {
     return selectivity;
   }
 
+  /**
+   * Return whether the expression is a removable cast based on stats and type 
bounds.
+   *
+   * <p>
+   * In Hive, if a value cannot be represented by the cast, the result of the 
cast is NULL,
+   * and therefore cannot fulfill the predicate. So the possible range of the 
values
+   * is limited by the range of possible values of the type.
+   * </p>
+   *
+   * @param exp       the expression to check
+   * @param tableScan the table that provides the statistics
+   * @return true if the expression is a removable cast, false otherwise
+   */
+  private boolean isRemovableCast(RexNode exp, HiveTableScan tableScan) {
+    if(SqlKind.CAST != exp.getKind()) {
+      return false;
+    }
+    RexCall cast = (RexCall) exp;
+    RexNode op0 = cast.getOperands().getFirst();
+    if (!(op0 instanceof RexInputRef)) {
+      return false;
+    }
+    int index = ((RexInputRef) op0).getIndex();
+    final List<ColStatistics> colStats = 
tableScan.getColStat(Collections.singletonList(index));
+    if (colStats.isEmpty()) {
+      return false;
+    }
+
+    SqlTypeName sourceType = op0.getType().getSqlTypeName();
+    SqlTypeName targetType = cast.getType().getSqlTypeName();
+
+    switch (sourceType) {
+    case TINYINT, SMALLINT, INTEGER, BIGINT:
+      switch (targetType) {// additional checks are needed
+      case TINYINT, SMALLINT, INTEGER, BIGINT:
+        return isRemovableIntegerCast(cast, op0, colStats);
+      case FLOAT, DOUBLE, DECIMAL:
+        return true;
+      default:
+        return false;
+      }
+    case FLOAT, DOUBLE, DECIMAL:
+      switch (targetType) {
+      // these CASTs do not show a modulo behavior, so it's ok to remove such 
a cast
+      case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL:
+        return true;
+      default:
+        return false;
+      }
+    case TIMESTAMP, DATE:
+      switch (targetType) {
+      case TIMESTAMP, DATE:
+        return true;
+      default:
+        return false;
+      }
+      // unknown type, do not remove the cast
+    default:
+      return false;
+    }
+  }
+
+  private static boolean isRemovableIntegerCast(RexCall cast, RexNode op0, 
List<ColStatistics> colStats) {
+    // If the source type is completely within the target type, the cast is 
lossless
+    Range<Float> targetRange = getRangeOfType(cast.getType(), 
BoundType.CLOSED, BoundType.CLOSED);
+    Range<Float> sourceRange = getRangeOfType(op0.getType(), BoundType.CLOSED, 
BoundType.CLOSED);
+    if (targetRange.encloses(sourceRange)) {
+      return true;
+    }
+
+    // Check that the possible values of the input column are all within the 
type range of the cast
+    // otherwise the CAST introduces some modulo-like behavior
+    ColStatistics colStat = colStats.getFirst();
+    ColStatistics.Range colRange = colStat.getRange();
+    if (colRange == null || colRange.minValue == null || colRange.maxValue == 
null) {
+      return false;
+    }
+
+    // are all values of the input column accepted by the cast?
+    SqlTypeName targetType = cast.getType().getSqlTypeName();
+    double min = ((Number) targetType.getLimit(false, 
SqlTypeName.Limit.OVERFLOW, false, -1, -1)).doubleValue();
+    double max = ((Number) targetType.getLimit(true, 
SqlTypeName.Limit.OVERFLOW, false, -1, -1)).doubleValue();
+    return min < colRange.minValue.doubleValue() && 
colRange.maxValue.doubleValue() < max;
+  }
+
+  /**
+   * Get the range of values that are rounded to valid values of a type.
+   *
+   * @param type the type
+   * @param lowerBound the lower bound type of the result
+   * @param upperBound the upper bound type of the result
+   * @return the range of the type
+   */
+  private static Range<Float> getRangeOfType(RelDataType type, BoundType 
lowerBound, BoundType upperBound) {
+    switch (type.getSqlTypeName()) {
+    // in case of integer types,
+    case TINYINT:
+      return Range.closed(-128.99998f, 127.99999f);
+    case SMALLINT:
+      return Range.closed(-32768.996f, 32767.998f);
+    case INTEGER:
+      return Range.closed(-2.1474836E9f, 2.1474836E9f);
+    case BIGINT, DATE, TIMESTAMP:
+      return Range.closed(-9.223372E18f, 9.223372E18f);
+    case DECIMAL:
+      return getRangeOfDecimalType(type, lowerBound, upperBound);
+    case FLOAT, DOUBLE:
+      return Range.closed(-Float.MAX_VALUE, Float.MAX_VALUE);
+    default:
+      throw new IllegalStateException("Unsupported type: " + type);
+    }
+  }
+
+  private static Range<Float> getRangeOfDecimalType(RelDataType type, 
BoundType lowerBound, BoundType upperBound) {
+    // values outside the representable range are cast to NULL, so adapt the 
boundaries
+    int digits = type.getPrecision() - type.getScale();
+    // the cast does some rounding, i.e., CAST(99.9499 AS DECIMAL(3,1)) = 99.9
+    // but CAST(99.95 AS DECIMAL(3,1)) = NULL
+    float adjust = (float) (5 * Math.pow(10, -(type.getScale() + 1)));
+    // the range of values supported by the type is interval 
[-typeRangeExtent, typeRangeExtent] (both inclusive)
+    // e.g., the typeRangeExt is 99.94999 for DECIMAL(3,1)
+    float typeRangeExtent = Math.nextDown((float) (Math.pow(10, digits) - 
adjust));
+
+    // the resulting value of +- adjust would be rounded up, so in some cases 
we need to use Math.nextDown
+    boolean lowerInclusive = BoundType.CLOSED.equals(lowerBound);
+    boolean upperInclusive = BoundType.CLOSED.equals(upperBound);
+    float lowerUniverse = lowerInclusive ? -typeRangeExtent : 
Math.nextDown(-typeRangeExtent);
+    float upperUniverse = upperInclusive ? typeRangeExtent : 
Math.nextUp(typeRangeExtent);

Review Comment:
   I see, thanks for the clarification. This has been used previously so that 
the bounds of the type range are the same as the predicate range. We can use 
convertRangeToClosedOpen later to ensure that the "BoundType"s are the same.
   
   I'll simplify the code.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +605,428 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesOutsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES2, KLL2);
+    checkSelectivity(16 / 28.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateTypeMatrix() {
+    // checks many possible combinations of types
+    List<RelDataTypeField> fields = tableType.getFieldList();
+    for (var srcField : fields) {
+      if (isTemporal(srcField.getType())) {
+        continue;
+      }
+
+      useFieldWithValues(srcField.getName(), VALUES, KLL);
+
+      for (var tgt : fields) {
+        try {
+          if (isTemporal(tgt.getType())) {
+            continue;
+          }
+
+          RexNode expr = cast(srcField.getName(), tgt.getType());
+          checkBetweenSelectivity(3, VALUES.length, VALUES.length, expr, 5, 7);
+        } catch (AssertionError e) {
+          throw new AssertionError("Error when casting from " + 
srcField.getType() + " to " + tgt.getType(), e);
+        }
+      }
+    }
+  }
+
+  private boolean isTemporal(RelDataType type) {
+    return type.getSqlTypeName() == TIMESTAMP || type.getSqlTypeName() == 
SqlTypeName.DATE;
+  }
+
+  @Test
+  public void testRangePredicateWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SqlTypeName.FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SqlTypeName.DOUBLE), 
int5));
+  }
+
+  @Test
+  public void testRangePredicateWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    RelDataType decimal3s1 = decimalType(3, 1);
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+
+    RelDataType decimal4s1 = decimalType(4, 1);
+    checkSelectivity(10 / 28.f, lt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, le(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, ge(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+
+    RelDataType decimal2s1 = decimalType(2, 1);
+    checkSelectivity(2 / 28.f, lt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, le(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+
+    // expected: 100_000f
+    RelDataType decimal7s1 = decimalType(7, 1);
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+  }
+
+  private void checkTimeFieldOnMidnightTimestamps(RexNode field) {
+    // note: use only values from VALUES_TIME that specify a date without 
hh:mm:ss!
+    checkSelectivity(7 / 7.f, ge(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(5 / 7.f, ge(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(1 / 7.f, ge(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(6 / 7.f, gt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(4 / 7.f, gt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(0 / 7.f, gt(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(1 / 7.f, le(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(3 / 7.f, le(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(7 / 7.f, le(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(0 / 7.f, lt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(2 / 7.f, lt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(6 / 7.f, lt(field, literalTimestamp("2020-11-07")));
+  }
+
+  private void checkTimeFieldOnIntraDayTimestamps(RexNode field) {
+    checkSelectivity(3 / 7.f, ge(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(2 / 7.f, gt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(5 / 7.f, le(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, lt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+  }
+
+  @Test
+  public void testRangePredicateOnTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+    checkTimeFieldOnIntraDayTimestamps(currentInputRef);
+  }
+
+  @Test
+  public void testRangePredicateOnTimestampWithCast() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    RexNode expr1 = cast("f_timestamp", SqlTypeName.DATE);
+    checkTimeFieldOnMidnightTimestamps(expr1);
+    checkTimeFieldOnIntraDayTimestamps(expr1);
+
+    RexNode expr2 = cast("f_timestamp", SqlTypeName.TIMESTAMP);
+    checkTimeFieldOnMidnightTimestamps(expr2);
+    checkTimeFieldOnIntraDayTimestamps(expr2);
+  }
+
+  @Test
+  public void testRangePredicateOnDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testRangePredicateOnDateWithCast() {

Review Comment:
   I'll add them.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +605,428 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesOutsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES2, KLL2);
+    checkSelectivity(16 / 28.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateTypeMatrix() {
+    // checks many possible combinations of types
+    List<RelDataTypeField> fields = tableType.getFieldList();
+    for (var srcField : fields) {
+      if (isTemporal(srcField.getType())) {
+        continue;
+      }
+
+      useFieldWithValues(srcField.getName(), VALUES, KLL);
+
+      for (var tgt : fields) {
+        try {
+          if (isTemporal(tgt.getType())) {
+            continue;
+          }
+
+          RexNode expr = cast(srcField.getName(), tgt.getType());
+          checkBetweenSelectivity(3, VALUES.length, VALUES.length, expr, 5, 7);
+        } catch (AssertionError e) {
+          throw new AssertionError("Error when casting from " + 
srcField.getType() + " to " + tgt.getType(), e);
+        }

Review Comment:
   If we remove the wrapping, we will lose the information which (srcType, 
tgtType) combination caused the problem.
   
   Here a comparison, wrapping the assertion:
   
   ```
   java.lang.AssertionError: Error when casting from DECIMAL(38, 25) to 
DECIMAL(10, 3)
   
        at 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.TestFilterSelectivityEstimator.testCastMatrix(TestFilterSelectivityEstimator.java:661)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.mockito.internal.runners.DefaultInternalRunner$1$1.evaluate(DefaultInternalRunner.java:55)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at 
org.mockito.internal.runners.DefaultInternalRunner$1.run(DefaultInternalRunner.java:100)
        at 
org.mockito.internal.runners.DefaultInternalRunner.run(DefaultInternalRunner.java:107)
        at org.mockito.internal.runners.StrictRunner.run(StrictRunner.java:42)
        at org.mockito.junit.MockitoJUnitRunner.run(MockitoJUnitRunner.java:163)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
        at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:231)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
   Caused by: java.lang.AssertionError: expected:<1> but was:<2>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:647)
        at org.junit.Assert.assertEquals(Assert.java:633)
        at 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.TestFilterSelectivityEstimator.testCastMatrix(TestFilterSelectivityEstimator.java:658)
        ... 32 more
   ```
   
   without the wrapping there is no mention of the types, DECIMAL(38, 25) or 
DECIMAL(10, 3):
   ```
   java.lang.AssertionError: 
   Expected :1
   Actual   :2
   <Click to see difference>
   
   
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:647)
        at org.junit.Assert.assertEquals(Assert.java:633)
        at 
org.apache.hadoop.hive.ql.optimizer.calcite.stats.TestFilterSelectivityEstimator.testCastMatrix(TestFilterSelectivityEstimator.java:659)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.mockito.internal.runners.DefaultInternalRunner$1$1.evaluate(DefaultInternalRunner.java:55)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at 
org.mockito.internal.runners.DefaultInternalRunner$1.run(DefaultInternalRunner.java:100)
        at 
org.mockito.internal.runners.DefaultInternalRunner.run(DefaultInternalRunner.java:107)
        at org.mockito.internal.runners.StrictRunner.run(StrictRunner.java:42)
        at org.mockito.junit.MockitoJUnitRunner.run(MockitoJUnitRunner.java:163)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
        at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:231)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
   ```



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/stats/TestFilterSelectivityEstimator.java:
##########
@@ -511,6 +605,428 @@ public void 
testComputeRangePredicateSelectivityNotBetweenWithNULLS() {
     
doReturn(Collections.singletonList(stats)).when(tableMock).getColStat(Collections.singletonList(0));
     RexNode filter = REX_BUILDER.makeCall(HiveBetween.INSTANCE, boolTrue, 
inputRef0, int1, int3);
     FilterSelectivityEstimator estimator = new 
FilterSelectivityEstimator(scan, mq);
-    Assert.assertEquals(0.55, estimator.estimateSelectivity(filter), DELTA);
+    // only the values 4, 5, 6, 7 fulfill the condition NOT BETWEEN 1 AND 3
+    // (the NULL values do not fulfill the condition)
+    Assert.assertEquals(0.2, estimator.estimateSelectivity(filter), DELTA);
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesInsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateCastIntegerValuesOutsideTypeRange() {
+    // use VALUES2, even if the tested types cannot represent its values
+    // we're only interested in whether the cast to a smaller integer type 
results in the default selectivity
+    useFieldWithValues("f_tinyint", VALUES2, KLL2);
+    checkSelectivity(16 / 28.f, ge(cast("f_tinyint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_tinyint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_tinyint", BIGINT), int5));
+
+    useFieldWithValues("f_smallint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_smallint", TINYINT), int5));
+    checkSelectivity(18 / 28.f, ge(cast("f_smallint", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_smallint", BIGINT), int5));
+
+    useFieldWithValues("f_integer", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_integer", SMALLINT), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_integer", BIGINT), int5));
+
+    useFieldWithValues("f_bigint", VALUES2, KLL2);
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", TINYINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", SMALLINT), int5));
+    checkSelectivity(1 / 3.f, ge(cast("f_bigint", INTEGER), int5));
+    checkSelectivity(20 / 28.f, ge(cast("f_bigint", BIGINT), int5));
+  }
+
+  @Test
+  public void testRangePredicateTypeMatrix() {
+    // checks many possible combinations of types
+    List<RelDataTypeField> fields = tableType.getFieldList();
+    for (var srcField : fields) {
+      if (isTemporal(srcField.getType())) {
+        continue;
+      }
+
+      useFieldWithValues(srcField.getName(), VALUES, KLL);
+
+      for (var tgt : fields) {
+        try {
+          if (isTemporal(tgt.getType())) {
+            continue;
+          }
+
+          RexNode expr = cast(srcField.getName(), tgt.getType());
+          checkBetweenSelectivity(3, VALUES.length, VALUES.length, expr, 5, 7);
+        } catch (AssertionError e) {
+          throw new AssertionError("Error when casting from " + 
srcField.getType() + " to " + tgt.getType(), e);
+        }
+      }
+    }
+  }
+
+  private boolean isTemporal(RelDataType type) {
+    return type.getSqlTypeName() == TIMESTAMP || type.getSqlTypeName() == 
SqlTypeName.DATE;
+  }
+
+  @Test
+  public void testRangePredicateWithCast() {
+    useFieldWithValues("f_numeric", VALUES, KLL);
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(10 / 13.f, lt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(2 / 13.f, gt(cast("f_numeric", TINYINT), int5));
+    checkSelectivity(11 / 13.f, le(cast("f_numeric", TINYINT), int5));
+
+    checkSelectivity(12 / 13f, ge(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(1 / 13f, lt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(5 / 13f, gt(cast("f_numeric", TINYINT), int2));
+    checkSelectivity(8 / 13f, le(cast("f_numeric", TINYINT), int2));
+
+    // check some types
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", INTEGER), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SMALLINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", BIGINT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SqlTypeName.FLOAT), int5));
+    checkSelectivity(3 / 13.f, ge(cast("f_numeric", SqlTypeName.DOUBLE), 
int5));
+  }
+
+  @Test
+  public void testRangePredicateWithCast2() {
+    useFieldWithValues("f_numeric", VALUES2, KLL2);
+    RelDataType decimal3s1 = decimalType(3, 1);
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+
+    // values from -99.94999 to 99.94999 (both inclusive)
+    checkSelectivity(7 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(7 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(100)));
+
+    RelDataType decimal4s1 = decimalType(4, 1);
+    checkSelectivity(10 / 28.f, lt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(20 / 28.f, le(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+    checkSelectivity(13 / 28.f, ge(cast("f_numeric", decimal4s1), 
literalFloat(100)));
+
+    RelDataType decimal2s1 = decimalType(2, 1);
+    checkSelectivity(2 / 28.f, lt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(2 / 28.f, le(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, gt(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+    checkSelectivity(0 / 28.f, ge(cast("f_numeric", decimal2s1), 
literalFloat(100)));
+
+    // expected: 100_000f
+    RelDataType decimal7s1 = decimalType(7, 1);
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 10_000f, 100_000f, because CAST(1_000_000 AS DECIMAL(7,1)) = 
NULL, and similar for even larger values
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(9999)));
+    checkSelectivity(2 / 28.f, ge(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+
+    // expected: 100_000f
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10000)));
+    checkSelectivity(1 / 28.f, gt(cast("f_numeric", decimal7s1), 
literalFloat(10001)));
+
+    // expected 1f, 10f, 99.94998f, 99.94999f
+    checkSelectivity(4 / 28.f, ge(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, gt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    // expected -99.94999f, -99.94998f, 0f, 1f
+    checkSelectivity(4 / 28.f, le(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+    checkSelectivity(3 / 28.f, lt(cast("f_numeric", decimal3s1), 
literalFloat(1)));
+  }
+
+  private void checkTimeFieldOnMidnightTimestamps(RexNode field) {
+    // note: use only values from VALUES_TIME that specify a date without 
hh:mm:ss!
+    checkSelectivity(7 / 7.f, ge(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(5 / 7.f, ge(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(1 / 7.f, ge(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(6 / 7.f, gt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(4 / 7.f, gt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(0 / 7.f, gt(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(1 / 7.f, le(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(3 / 7.f, le(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(7 / 7.f, le(field, literalTimestamp("2020-11-07")));
+
+    checkSelectivity(0 / 7.f, lt(field, literalTimestamp("2020-11-01")));
+    checkSelectivity(2 / 7.f, lt(field, literalTimestamp("2020-11-03")));
+    checkSelectivity(6 / 7.f, lt(field, literalTimestamp("2020-11-07")));
+  }
+
+  private void checkTimeFieldOnIntraDayTimestamps(RexNode field) {
+    checkSelectivity(3 / 7.f, ge(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(2 / 7.f, gt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(5 / 7.f, le(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+    checkSelectivity(4 / 7.f, lt(field, 
literalTimestamp("2020-11-05T11:23:45Z")));
+  }
+
+  @Test
+  public void testRangePredicateOnTimestamp() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+    checkTimeFieldOnIntraDayTimestamps(currentInputRef);
+  }
+
+  @Test
+  public void testRangePredicateOnTimestampWithCast() {
+    useFieldWithValues("f_timestamp", VALUES_TIME, KLL_TIME);
+    RexNode expr1 = cast("f_timestamp", SqlTypeName.DATE);
+    checkTimeFieldOnMidnightTimestamps(expr1);
+    checkTimeFieldOnIntraDayTimestamps(expr1);
+
+    RexNode expr2 = cast("f_timestamp", SqlTypeName.TIMESTAMP);
+    checkTimeFieldOnMidnightTimestamps(expr2);
+    checkTimeFieldOnIntraDayTimestamps(expr2);
+  }
+
+  @Test
+  public void testRangePredicateOnDate() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(currentInputRef);
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testRangePredicateOnDateWithCast() {
+    useFieldWithValues("f_date", VALUES_TIME, KLL_TIME);
+    checkTimeFieldOnMidnightTimestamps(cast("f_date", SqlTypeName.DATE));
+    checkTimeFieldOnMidnightTimestamps(cast("f_date", SqlTypeName.TIMESTAMP));
+
+    // it does not make sense to compare with "2020-11-05T11:23:45Z",
+    // as that value would not be stored as-is in a date column, but as 
"2020-11-05" instead
+  }
+
+  @Test
+  public void testBetweenWithCastToTinyIntCheckRounding() {
+    useFieldWithValues("f_numeric", VALUES3, KLL3);
+    float total = VALUES3.length;
+    float universe = 10; // the number of values that "survive" the cast
+    RexNode cast = cast("f_numeric", TINYINT);
+    // check rounding of positive numbers
+    checkBetweenSelectivity(3, universe, total, cast, 0, 10);
+    checkBetweenSelectivity(4, universe, total, cast, 0, 11);
+    checkBetweenSelectivity(4, universe, total, cast, 10, 20);
+    checkBetweenSelectivity(1, universe, total, cast, 11, 20);
+
+    // check rounding of negative numbers
+    checkBetweenSelectivity(4, universe, total, cast, -20, -10);
+    checkBetweenSelectivity(1, universe, total, cast, -20, -11);
+    checkBetweenSelectivity(3, universe, total, cast, -10, 0);
+    checkBetweenSelectivity(4, universe, total, cast, -11, 0);
+  }
+
+  @Test
+  public void testBetweenWithCastToTinyInt() {
+    useFieldWithValues("f_numeric", VALUES3, KLL3);
+    float total = VALUES3.length;
+    float universe = 10; // the number of values that "survive" the cast
+    RexNode cast = cast("f_numeric", TINYINT);
+    checkBetweenSelectivity(5, universe, total, cast, 0, 1e20f);
+    checkBetweenSelectivity(5, universe, total, cast, -1e20f, 0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testBetweenWithCastToSmallInt() {
+    useFieldWithValues("f_numeric", VALUES3, KLL3);
+    float total = VALUES3.length;
+    float universe = 14; // the number of values that "survive" the cast
+    RexNode cast = cast("f_numeric", SMALLINT);
+    checkBetweenSelectivity(7, universe, total, cast, 0, 1e20f);
+    checkBetweenSelectivity(7, universe, total, cast, -1e20f, 0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testBetweenWithCastToInteger() {
+    useFieldWithValues("f_numeric", VALUES3, KLL3);
+    float total = VALUES3.length;
+    float universe = 18; // the number of values that "survive" the cast
+    RexNode cast = cast("f_numeric", INTEGER);
+    checkBetweenSelectivity(9, universe, total, cast, 0, 1e20f);
+    checkBetweenSelectivity(9, universe, total, cast, -1e20f, 0);
+    checkBetweenSelectivity(0, universe, total, cast, 100f, 0f);
+  }
+
+  @Test
+  public void testBetweenWithCastToBigInt() {

Review Comment:
   I've added the test cases whenever I felt there was some boundary case. The 
predicates were executed with Hive to get the [real expected 
values](https://github.com/thomasrebele/hive/blob/47e8f4e203110d2c281d58503cc83b9b5653c106/ql/src/test/results/clientpositive/llap/stats_histogram_HIVE-29424.q.out).
 These expected values depend on many things (source type, target type, the 
actual values in the data, boundaries, types of boundaries) [*]. Getting the 
expected values for all different combinations would result in a lot of work, 
so I think we should choose wisely.
   
   I've included your proposal of adding a test that loops through the possible 
types. That's indeed a nice idea, and I'll extend it to include both range 
predicates and BETWEEN. However, due to the nature of the problem, this matrix 
only covers a small part of the search space. I.e., those executions that do 
not round while CASTing, or CAST some of the values of the input data to NULL. 
If we want to include those cases in a kind of matrix, we would need the 
expected result for the combination of input parameters [*].
   
   I'm not sure how to improve this substantially without spending several more 
days for refactoring the test cases.
   
   PS: At first I had many of the BETWEEN checks grouped together in a single 
method, but later I separated them to avoid some Sonar Qube warnings about 
nested blocks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to