xuyangzhong commented on a change in pull request #17652:
URL: https://github.com/apache/flink/pull/17652#discussion_r741632375



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -176,13 +176,13 @@ object FlinkBatchRuleSets {
     * RuleSet to do push predicate/partition into table scan
     */
   val FILTER_TABLESCAN_PUSHDOWN_RULES: RuleSet = RuleSets.ofList(
-    // push a filter down into the table scan
-    PushFilterIntoTableSourceScanRule.INSTANCE,
-    PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
     // push partition into the table scan
     PushPartitionIntoLegacyTableSourceScanRule.INSTANCE,
     // push partition into the dynamic table scan
-    PushPartitionIntoTableSourceScanRule.INSTANCE
+    PushPartitionIntoTableSourceScanRule.INSTANCE,

Review comment:
       My assumption is that the legacy code will be cleaned future, and so it 
will leave one rule each here. Now maybe to classify them separately is more 
clear.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -176,13 +176,13 @@ object FlinkBatchRuleSets {
     * RuleSet to do push predicate/partition into table scan
     */
   val FILTER_TABLESCAN_PUSHDOWN_RULES: RuleSet = RuleSets.ofList(
-    // push a filter down into the table scan
-    PushFilterIntoTableSourceScanRule.INSTANCE,
-    PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
     // push partition into the table scan
     PushPartitionIntoLegacyTableSourceScanRule.INSTANCE,
     // push partition into the dynamic table scan
-    PushPartitionIntoTableSourceScanRule.INSTANCE
+    PushPartitionIntoTableSourceScanRule.INSTANCE,

Review comment:
       My assumption is that the legacy code will be cleaned in future, and so 
it will leave one rule each here. Now maybe to classify them separately is more 
clear.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSourceTest.scala
##########
@@ -76,30 +98,41 @@ class PartitionableSourceTest(
         val catalogPartitionSpec = new CatalogPartitionSpec(partition)
         val catalogPartition = new CatalogPartitionImpl(
           new java.util.HashMap[String, String](), "")
-        catalog.createPartition(mytablePath, catalogPartitionSpec, 
catalogPartition, true)
+        catalog.createPartition(
+          partitionableTablePath, catalogPartitionSpec, catalogPartition, true)
+        catalog.createPartition(
+          partitionableAndFilterableTablePath, catalogPartitionSpec, 
catalogPartition, true)
       })
     }
   }
 
   @Test
   def testSimplePartitionFieldPredicate1(): Unit = {
-    util.verifyExecPlan("SELECT * FROM MyTable WHERE part1 = 'A'")
+    util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE part1 = 'A'")
   }
 
   @Test
   def testPartialPartitionFieldPredicatePushDown(): Unit = {
-    util.verifyExecPlan("SELECT * FROM MyTable WHERE (id > 2 OR part1 = 'A') 
AND part2 > 1")
+    util.verifyExecPlan(
+      "SELECT * FROM PartitionableTable WHERE (id > 2 OR part1 = 'A') AND 
part2 > 1")
   }
 
   @Test
   def testWithUdfAndVirtualColumn(): Unit = {
     util.addFunction("MyUdf", Func1)
-    util.verifyExecPlan("SELECT * FROM MyTable WHERE id > 2 AND MyUdf(part2) < 
3")
+    util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE id > 2 AND 
MyUdf(part2) < 3")
   }
 
   @Test
   def testUnconvertedExpression(): Unit = {
-    util.verifyExecPlan("select * from MyTable where trim(part1) = 'A' and 
part2 > 1")
+    util.verifyExecPlan("select * from PartitionableTable where trim(part1) = 
'A' and part2 > 1")
+  }
+
+  @Test
+  def testPushDownPartitionAndFiltersContainPartitionKeys(): Unit = {
+    util.verifyExecPlan(
+      "select * from PartitionableAndFilterableTable " +

Review comment:
       Just like "select name from PartitionableAndFilterableTable ...(same 
with now codes)"?

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
         "b" : "INT"
       } ]
     },
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a, 
b], metadata=[]]], fields=[a, b])",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, PartitionTable, partitions=[{p=A}], project=[a, b], 
metadata=[]]], fields=[a, b])",

Review comment:
       The specs will be print each by the order in which they added to the 
table source. Now PushDown Partitions will first and partitions info will be 
print first. I can add a test case about json plan to verify it when the 
filters and partitions are all exist in the table source.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
         "b" : "INT"
       } ]
     },
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a, 
b], metadata=[]]], fields=[a, b])",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, PartitionTable, partitions=[{p=A}], project=[a, b], 
metadata=[]]], fields=[a, b])",

Review comment:
       Sorry, I test some cases and find one bad case when the table source has 
a watermark. It will miss the rule in optimize period "predicate_pushdown" and 
match the "logical". So PushDownProject will be called first in "logical", and 
then PushDownWatermark will be called later in "logical_rewrite", and then 
PushDownFilter in "logical_rewrite". The source description just like :
   `TableSourceScan(table=[[default_catalog, default_database, TestTable, 
project=[a, b, c], metadata=[], watermark=[c], filter=[>(a, 1)]]], fields=[a, 
b, c])`
   Maybe we need to sort the specs in table source to print them in order if 
the order is important.
   

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
         "b" : "INT"
       } ]
     },
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a, 
b], metadata=[]]], fields=[a, b])",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, PartitionTable, partitions=[{p=A}], project=[a, b], 
metadata=[]]], fields=[a, b])",

Review comment:
       Got it! The issue is here [FLINK-24754].




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to