yunfengzhou-hub commented on code in PR #6728:
URL: https://github.com/apache/paimon/pull/6728#discussion_r2605717080


##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java:
##########


Review Comment:
   Let's also update the description of the configuration `scan.partitions` to 
show the following information:
   
   * max_pt can be used on scan sources(or "normal sources" if more proper) due 
to flink batch job optimizations for lookup join. But it is not recommended to 
use it in this way for streaming jobs, as the max_pt will not be updated.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java:
##########
@@ -135,45 +139,59 @@ public Result applyFilters(List<ResolvedExpression> 
filters) {
         return Result.of(filters, unConsumedFilters);
     }
 
+    private PartitionPredicate getPartitionPredicate() {

Review Comment:
   Methods `getPartitionPredicate` and `getPartitionPredicateWithOptions` seems 
not differ by options. `getPartitionPredicateWithOptions` is only invoked by 
`getPartitionPredicate`, and `getPartitionPredicate` contains codes that were 
previously located in `getPartitionPredicateWithOptions`. Thus it might be 
better to merge these two methods back to simplify commit history, or rename 
these two methods to better reflect their difference.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java:
##########
@@ -902,6 +902,46 @@ public void 
testScanWithSpecifiedPartitionsWithFieldMapping() {
         assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12), 
Row.of(2, 22));
     }
 
+    @Test
+    public void testScanWithSpecifiedPartitionsWithMaxPt() {
+        sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+        sql("CREATE TABLE Q (id INT)");
+        sql(
+                "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11, 
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+        sql("INSERT INTO Q VALUES (1), (2), (3)");
+        String query =
+                "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_pt()') */ ON Q.id = P.id ORDER BY Q.id, P.v";
+        assertThat(sql(query)).containsExactly(Row.of(1, 12), Row.of(2, 22), 
Row.of(3, 32));
+    }
+
+    @Test
+    public void testScanWithSpecifiedPartitionsWithMaxTwoPt() {
+        sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+        sql("CREATE TABLE Q (id INT)");
+        sql(
+                "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11, 
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+        sql("INSERT INTO Q VALUES (1), (2), (3)");
+        String query =
+                "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_two_pt()') */ ON Q.id = P.id ORDER BY Q.id, 
P.v";
+        assertThat(sql(query))
+                .containsExactly(
+                        Row.of(1, 11), Row.of(1, 12), Row.of(2, 22), Row.of(3, 
31), Row.of(3, 32));
+    }
+
+    @Test
+    public void testScanWithSpecifiedPartitionsWithLevelMaxPt() throws 
Exception {
+        sql(
+                "CREATE TABLE P (id INT, v INT, pt1 STRING, pt2 STRING, pt3 
STRING) PARTITIONED BY (pt1, pt2, pt3)");
+        sql("CREATE TABLE Q (id INT)");
+        sql(
+                "INSERT INTO P VALUES (1, 10, 'a', '2025-10-01', '1'), (2, 20, 
'a', '2025-10-01', '2'), (3, 30, 'a', '2025-10-02', '1'), (4, 40, 'a', 
'2025-10-02', '2'), "
+                        + "(1, 11, 'b', '2025-10-01', '1'), (2, 21, 'b', 
'2025-10-01', '2'), (3, 31, 'b', '2025-10-02', '1'), (4, 41, 'b', '2025-10-02', 
'2')");
+        sql("INSERT INTO Q VALUES (1), (2), (3), (4)");
+        String query =
+                "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ ON Q.id = P.id 
ORDER BY Q.id, P.v";

Review Comment:
   According offline discussions, the initial purpose of batch job users is to 
use lookup join, but it will be optimized to inner join in Flink batch mode. 
Thus how about directly testing the lookup join queries in these test cases, to 
have tests cover the Flink join optimization logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to