xuyangzhong commented on a change in pull request #17652:
URL: https://github.com/apache/flink/pull/17652#discussion_r741632375
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -176,13 +176,13 @@ object FlinkBatchRuleSets {
* RuleSet to do push predicate/partition into table scan
*/
val FILTER_TABLESCAN_PUSHDOWN_RULES: RuleSet = RuleSets.ofList(
- // push a filter down into the table scan
- PushFilterIntoTableSourceScanRule.INSTANCE,
- PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
// push partition into the table scan
PushPartitionIntoLegacyTableSourceScanRule.INSTANCE,
// push partition into the dynamic table scan
- PushPartitionIntoTableSourceScanRule.INSTANCE
+ PushPartitionIntoTableSourceScanRule.INSTANCE,
Review comment:
My assumption is that the legacy code will be cleaned future, and so it
will leave one rule each here. Now maybe to classify them separately is more
clear.
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -176,13 +176,13 @@ object FlinkBatchRuleSets {
* RuleSet to do push predicate/partition into table scan
*/
val FILTER_TABLESCAN_PUSHDOWN_RULES: RuleSet = RuleSets.ofList(
- // push a filter down into the table scan
- PushFilterIntoTableSourceScanRule.INSTANCE,
- PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
// push partition into the table scan
PushPartitionIntoLegacyTableSourceScanRule.INSTANCE,
// push partition into the dynamic table scan
- PushPartitionIntoTableSourceScanRule.INSTANCE
+ PushPartitionIntoTableSourceScanRule.INSTANCE,
Review comment:
My assumption is that the legacy code will be cleaned in future, and so
it will leave one rule each here. Now maybe to classify them separately is more
clear.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSourceTest.scala
##########
@@ -76,30 +98,41 @@ class PartitionableSourceTest(
val catalogPartitionSpec = new CatalogPartitionSpec(partition)
val catalogPartition = new CatalogPartitionImpl(
new java.util.HashMap[String, String](), "")
- catalog.createPartition(mytablePath, catalogPartitionSpec,
catalogPartition, true)
+ catalog.createPartition(
+ partitionableTablePath, catalogPartitionSpec, catalogPartition, true)
+ catalog.createPartition(
+ partitionableAndFilterableTablePath, catalogPartitionSpec,
catalogPartition, true)
})
}
}
@Test
def testSimplePartitionFieldPredicate1(): Unit = {
- util.verifyExecPlan("SELECT * FROM MyTable WHERE part1 = 'A'")
+ util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE part1 = 'A'")
}
@Test
def testPartialPartitionFieldPredicatePushDown(): Unit = {
- util.verifyExecPlan("SELECT * FROM MyTable WHERE (id > 2 OR part1 = 'A')
AND part2 > 1")
+ util.verifyExecPlan(
+ "SELECT * FROM PartitionableTable WHERE (id > 2 OR part1 = 'A') AND
part2 > 1")
}
@Test
def testWithUdfAndVirtualColumn(): Unit = {
util.addFunction("MyUdf", Func1)
- util.verifyExecPlan("SELECT * FROM MyTable WHERE id > 2 AND MyUdf(part2) <
3")
+ util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE id > 2 AND
MyUdf(part2) < 3")
}
@Test
def testUnconvertedExpression(): Unit = {
- util.verifyExecPlan("select * from MyTable where trim(part1) = 'A' and
part2 > 1")
+ util.verifyExecPlan("select * from PartitionableTable where trim(part1) =
'A' and part2 > 1")
+ }
+
+ @Test
+ def testPushDownPartitionAndFiltersContainPartitionKeys(): Unit = {
+ util.verifyExecPlan(
+ "select * from PartitionableAndFilterableTable " +
Review comment:
Just like "select name from PartitionableAndFilterableTable ...(same
with now codes)"?
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSourceTest.scala
##########
@@ -55,17 +55,39 @@ class PartitionableSourceTest(
|)
|""".stripMargin
+ // test when PushDownFilter can consume all filters including fields
partitionKeys
+ val partitionableAndFilterableTable =
+ """
+ |CREATE TABLE PartitionableAndFilterableTable (
+ | id int,
+ | name string,
+ | part1 string,
+ | part2 int,
+ | virtualField as part2 + 1)
Review comment:
I just want to keep consistency with the current code above.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
"b" : "INT"
} ]
},
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a,
b], metadata=[]]], fields=[a, b])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, partitions=[{p=A}], project=[a, b],
metadata=[]]], fields=[a, b])",
Review comment:
The specs will be print each by the order in which they added to the
table source. Now PushDown Partitions will first and partitions info will be
print first. I can add a test case about json plan to verify it when the
filters and partitions are all exist in the table source.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
"b" : "INT"
} ]
},
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a,
b], metadata=[]]], fields=[a, b])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, partitions=[{p=A}], project=[a, b],
metadata=[]]], fields=[a, b])",
Review comment:
Sorry, I test some cases and find one bad case when the table source has
a watermark. It will miss the rule in optimize period "predicate_pushdown" and
match the "logical". So PushDownProject will be called first in "logical", and
then PushDownWatermark will be called later in "logical_rewrite", and then
PushDownFilter in "logical_rewrite". The source description just like :
`TableSourceScan(table=[[default_catalog, default_database, TestTable,
project=[a, b, c], metadata=[], watermark=[c], filter=[>(a, 1)]]], fields=[a,
b, c])`
Maybe we need to sort the specs in table source to print them in order if
the order is important.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
"b" : "INT"
} ]
},
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a,
b], metadata=[]]], fields=[a, b])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, partitions=[{p=A}], project=[a, b],
metadata=[]]], fields=[a, b])",
Review comment:
Got it! The issue is here [FLINK-24754].
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]