This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 52d1adc [FLINK-20366][table-planner-blink]
ColumnIntervalUtil#getColumnIntervalWithFilter should consider constant
predicate
52d1adc is described below
commit 52d1adcfd159bf8e952818eb08baddc568d7658a
Author: godfrey he <[email protected]>
AuthorDate: Fri Nov 27 17:30:52 2020 +0800
[FLINK-20366][table-planner-blink]
ColumnIntervalUtil#getColumnIntervalWithFilter should consider constant
predicate
(cherry picked from commit 7bf76c0b41a68ace751d4af48efc1edc2ed2d6c7)
---
.../planner/plan/utils/ColumnIntervalUtil.scala | 28 ++++---
.../{ => plan}/utils/ColumnIntervalUtilTest.scala | 89 +++++++++++++++++++++-
2 files changed, 106 insertions(+), 11 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
index 5bb762d..ee1703d 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
@@ -19,8 +19,7 @@
package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.planner.plan.stats._
-import
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.ColumnRelatedVisitor
-import
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.getLiteralValueByBroadType
+import
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.{ColumnRelatedVisitor,
getLiteralValueByBroadType}
import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral,
RexNode, RexUtil}
import org.apache.calcite.sql.SqlKind
@@ -210,13 +209,24 @@ object ColumnIntervalUtil {
}
val interval = relatedSubRexNode match {
case Some(rexNode) =>
- val orParts = RexUtil.flattenOr(Vector(RexUtil.toDnf(rexBuilder,
rexNode)))
- orParts.map(or => {
- val andParts = RexUtil.flattenAnd(Vector(or))
- val andIntervals = andParts.map(and =>
columnIntervalOfSinglePredicate(and))
- val res = andIntervals.filter(_ !=
null).foldLeft(beginInterval)(ValueInterval.intersect)
- res
- }).reduceLeft(ValueInterval.union)
+ if (rexNode.isAlwaysTrue) {
+ beginInterval
+ } else if (rexNode.isAlwaysFalse) {
+ ValueInterval.empty
+ } else if (RexUtil.isConstant(rexNode)) {
+ // this should not happen, just protect the following code
+ ValueInterval.infinite
+ } else {
+ val orParts = RexUtil.flattenOr(Vector(RexUtil.toDnf(rexBuilder,
rexNode)))
+ orParts.map(or => {
+ val andParts = RexUtil.flattenAnd(Vector(or))
+ val andIntervals = andParts.map(and =>
columnIntervalOfSinglePredicate(and))
+ val res = andIntervals
+ .filter(_ != null)
+ .foldLeft(beginInterval)(ValueInterval.intersect)
+ res
+ }).reduceLeft(ValueInterval.union)
+ }
case _ => beginInterval
}
if (interval == ValueInterval.infinite) null else interval
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtilTest.scala
similarity index 70%
rename from
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
rename to
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtilTest.scala
index 183634c..cf1964c 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtilTest.scala
@@ -16,12 +16,16 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.utils
+package org.apache.flink.table.planner.plan.utils
+import org.apache.flink.table.planner.calcite.{FlinkRexBuilder,
FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.planner.plan.stats._
import org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil._
-import org.junit.Assert.{assertEquals, assertTrue}
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.junit.Assert.{assertEquals, assertNull}
import org.junit.Test
import java.lang
@@ -189,4 +193,85 @@ class ColumnIntervalUtilTest {
)
}
+ @Test
+ def testGetColumnIntervalWithFilter(): Unit = {
+ val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(new
FlinkTypeSystem)
+ val rexBuilder: RexBuilder = new FlinkRexBuilder(typeFactory)
+
+ // ($1 >= 1 and $1 < 10) or (not($1 > 5)
+ val predicate = rexBuilder.makeCall(
+ SqlStdOperatorTable.OR,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.AND,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.BIGINT), 1),
+ rexBuilder.makeBigintLiteral(java.math.BigDecimal.valueOf(1))),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.LESS_THAN,
+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.BIGINT), 1),
+ rexBuilder.makeBigintLiteral(java.math.BigDecimal.valueOf(10)))),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.NOT,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN,
+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.BIGINT), 1),
+ rexBuilder.makeBigintLiteral(java.math.BigDecimal.valueOf(5))))
+ )
+
+ assertEquals(
+ toBigDecimalInterval(ValueInterval.apply(null, 10L, includeUpper =
false)),
+ ColumnIntervalUtil.getColumnIntervalWithFilter(
+ None,
+ predicate,
+ 1,
+ rexBuilder))
+
+ assertEquals(
+ toBigDecimalInterval(ValueInterval.apply(3L, 8L, includeLower = false,
includeUpper = false)),
+ ColumnIntervalUtil.getColumnIntervalWithFilter(
+ Some(toBigDecimalInterval(
+ ValueInterval.apply(3L, 8L, includeLower = false, includeUpper =
false))),
+ predicate,
+ 1,
+ rexBuilder))
+
+ assertEquals(
+ ValueInterval.empty,
+ ColumnIntervalUtil.getColumnIntervalWithFilter(
+ None,
+ rexBuilder.makeLiteral(false),
+ 0,
+ rexBuilder))
+
+ assertEquals(
+ ValueInterval.empty,
+ ColumnIntervalUtil.getColumnIntervalWithFilter(
+ Some(ValueInterval.apply(1L, 10L)),
+ rexBuilder.makeLiteral(false),
+ 0,
+ rexBuilder))
+
+ assertNull(
+ ColumnIntervalUtil.getColumnIntervalWithFilter(
+ None,
+ rexBuilder.makeLiteral(true),
+ 0,
+ rexBuilder))
+
+ assertEquals(
+ ValueInterval.apply(1L, 10L),
+ ColumnIntervalUtil.getColumnIntervalWithFilter(
+ Some(ValueInterval.apply(1L, 10L)),
+ rexBuilder.makeLiteral(true),
+ 0,
+ rexBuilder))
+
+ assertNull(
+ ColumnIntervalUtil.getColumnIntervalWithFilter(
+ None,
+ rexBuilder.makeBigintLiteral(java.math.BigDecimal.ONE),
+ 0,
+ rexBuilder))
+ }
}