swuferhong commented on code in PR #21622:
URL: https://github.com/apache/flink/pull/21622#discussion_r1104229887
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala:
##########
@@ -209,7 +209,7 @@ object RexNodeExtractor extends Logging {
val (partitionPredicates, nonPartitionPredicates) =
conjunctions.partition(isSupportedPartitionPredicate(_,
partitionFieldNames, inputFieldNames))
- (partitionPredicates, nonPartitionPredicates)
+ (partitionPredicates.filter(p => RexUtil.isDeterministic(p)),
nonPartitionPredicates)
Review Comment:
I think the better way to implement this code is to judge the
`isDeterministic()` in `isSupportedPartitionPredicate` instead of adding filter
after getting `partitionPredicates`. These rexNode which is deterministic need
to be added into the list of `nonPartitionPredicates`.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##########
@@ -1219,14 +1219,14 @@ public void applyPartitions(List<Map<String, String>>
remainingPartitions) {
} else {
// we will read data from Collections.emptyList() if
allPartitions is empty.
// therefore, we should clear all data manually.
- remainingPartitions = (List<Map<String, String>>)
Collections.emptyMap();
+ remainingPartitions = Collections.emptyList();
this.data.put(Collections.emptyMap(),
Collections.emptyList());
Review Comment:
Would this cast affect the final result of tests? Or you think it is
meaningless?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSourceTest.scala:
##########
@@ -148,6 +148,16 @@ class PartitionableSourceTest(val sourceFetchPartitions:
Boolean, val useCatalog
"select name from PartitionableAndFilterableTable " +
"where part1 = 'A' and part2 > 1 and id > 1")
}
+
+ @Test
+ def testRandCondition(): Unit = {
+ util.verifyRelPlan("SELECT * FROM PartitionableTable WHERE rand(1) <
0.001")
+ }
+
+ @Test
+ def testRandCondition2(): Unit = {
+ util.verifyRelPlan("SELECT * FROM PartitionableTable WHERE rand(part2) <
0.001")
+ }
Review Comment:
These are the same tests as tests you added in
`PushPartitionIntoLegacyTableSourceScanRuleTest`. I think there is no need to
add these tests. WDYT?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala:
##########
@@ -721,6 +722,70 @@ class RexNodeExtractorTest extends RexNodeTestBase {
}
}
+ @Test
+ def testExtractPartitionPredicateList(): Unit = {
+ val doubleType: RelDataType =
+ typeFactory.createFieldTypeFromLogicalType(new DoubleType(false))
+
+ val decimalType: RelDataType =
+ typeFactory.createFieldTypeFromLogicalType(new DecimalType(4, 3))
+
+ val rand =
+ rexBuilder.makeCall(FlinkSqlOperatorTable.RAND,
rexBuilder.makeLiteral(321, doubleType, true))
+ val c1 = rexBuilder.makeCall(
+ SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+ rand,
+ rexBuilder.makeLiteral(new BigDecimal(0.001), decimalType, true))
+ val (p, nonP) = RexNodeExtractor.extractPartitionPredicateList(
+ c1,
+ -1,
+ Array("a", "b", "p"),
+ rexBuilder,
+ Array("p"))
+ Assert.assertTrue(p.isEmpty)
+ Assert.assertTrue(nonP.nonEmpty)
+ }
+
+ @Test
+ def testExtractPartitionPredicateList2(): Unit = {
+ val stringType = typeFactory.createFieldTypeFromLogicalType(new
VarCharType())
+
+ val partition = rexBuilder.makeLiteral("1984")
+ val inputRef = rexBuilder.makeInputRef(stringType, 2)
+ val c1 = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, inputRef,
partition)
+ val (p, nonP) = RexNodeExtractor.extractPartitionPredicateList(
+ c1,
+ -1,
+ Array("a", "b", "p"),
+ rexBuilder,
+ Array("p"))
+ Assert.assertTrue(p.nonEmpty)
+ Assert.assertTrue(nonP.isEmpty)
+ }
+
+ @Test
+ def testExtractPartitionPredicateList3(): Unit = {
+ val intType = typeFactory.createFieldTypeFromLogicalType(new IntType())
+ val decimalType: RelDataType =
+ typeFactory.createFieldTypeFromLogicalType(new DecimalType(4, 3))
+
+ val inputRef = rexBuilder.makeInputRef(intType, 2)
+ val rand =
+ rexBuilder.makeCall(FlinkSqlOperatorTable.RAND, inputRef)
+ val c1 = rexBuilder.makeCall(
+ SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+ rand,
+ rexBuilder.makeLiteral(new BigDecimal(0.001), decimalType, true))
+ val (p, nonP) = RexNodeExtractor.extractPartitionPredicateList(
+ c1,
+ -1,
+ Array("a", "b", "p"),
+ rexBuilder,
+ Array("p"))
+ Assert.assertTrue(p.isEmpty)
+ Assert.assertTrue(nonP.isEmpty)
+ }
Review Comment:
Can it be combined into one test? Roughly speaking, these tests are similar
logic.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala:
##########
@@ -271,6 +271,12 @@ object RexNodeExtractor extends Logging {
}
}
+ val inputRefFinder = new InputRefVisitor
+ predicate.accept(inputRefFinder)
+ // if no fields reached, it's not partition condition
+ if (inputRefFinder.getFields.length == 0) {
+ return false
+ }
try {
Review Comment:
There is no need to new an `InputRefVisitor`. I think you need to deal it
in `isSupportedPartitionPredicate.visitor`. Changing the logic of
`visitor.visitInputRef` and override `visitRexCall` if needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]