yunfengzhou-hub commented on code in PR #6728:
URL: https://github.com/apache/paimon/pull/6728#discussion_r2605717080
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java:
##########
Review Comment:
Let's also update the description of the configuration `scan.partitions` to
show the following information:
* max_pt can be used on scan sources(or "normal sources" if more proper) due
to flink batch job optimizations for lookup join. But it is not recommended to
use it in this way for streaming jobs, as the max_pt will not be updated.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java:
##########
@@ -135,45 +139,59 @@ public Result applyFilters(List<ResolvedExpression>
filters) {
return Result.of(filters, unConsumedFilters);
}
+ private PartitionPredicate getPartitionPredicate() {
Review Comment:
Methods `getPartitionPredicate` and `getPartitionPredicateWithOptions` seems
not differ by options. `getPartitionPredicateWithOptions` is only invoked by
`getPartitionPredicate`, and `getPartitionPredicate` contains codes that were
previously located in `getPartitionPredicateWithOptions`. Thus it might be
better to merge these two methods back to simplify commit history, or rename
these two methods to better reflect their difference.
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java:
##########
@@ -902,6 +902,46 @@ public void
testScanWithSpecifiedPartitionsWithFieldMapping() {
assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12),
Row.of(2, 22));
}
+ @Test
+ public void testScanWithSpecifiedPartitionsWithMaxPt() {
+ sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+ sql("CREATE TABLE Q (id INT)");
+ sql(
+ "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11,
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+ sql("INSERT INTO Q VALUES (1), (2), (3)");
+ String query =
+ "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_pt()') */ ON Q.id = P.id ORDER BY Q.id, P.v";
+ assertThat(sql(query)).containsExactly(Row.of(1, 12), Row.of(2, 22),
Row.of(3, 32));
+ }
+
+ @Test
+ public void testScanWithSpecifiedPartitionsWithMaxTwoPt() {
+ sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+ sql("CREATE TABLE Q (id INT)");
+ sql(
+ "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11,
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+ sql("INSERT INTO Q VALUES (1), (2), (3)");
+ String query =
+ "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_two_pt()') */ ON Q.id = P.id ORDER BY Q.id,
P.v";
+ assertThat(sql(query))
+ .containsExactly(
+ Row.of(1, 11), Row.of(1, 12), Row.of(2, 22), Row.of(3,
31), Row.of(3, 32));
+ }
+
+ @Test
+ public void testScanWithSpecifiedPartitionsWithLevelMaxPt() throws
Exception {
+ sql(
+ "CREATE TABLE P (id INT, v INT, pt1 STRING, pt2 STRING, pt3
STRING) PARTITIONED BY (pt1, pt2, pt3)");
+ sql("CREATE TABLE Q (id INT)");
+ sql(
+ "INSERT INTO P VALUES (1, 10, 'a', '2025-10-01', '1'), (2, 20,
'a', '2025-10-01', '2'), (3, 30, 'a', '2025-10-02', '1'), (4, 40, 'a',
'2025-10-02', '2'), "
+ + "(1, 11, 'b', '2025-10-01', '1'), (2, 21, 'b',
'2025-10-01', '2'), (3, 31, 'b', '2025-10-02', '1'), (4, 41, 'b', '2025-10-02',
'2')");
+ sql("INSERT INTO Q VALUES (1), (2), (3), (4)");
+ String query =
+ "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ ON Q.id = P.id
ORDER BY Q.id, P.v";
Review Comment:
According offline discussions, the initial purpose of batch job users is to
use lookup join, but it will be optimized to inner join in Flink batch mode.
Thus how about directly testing the lookup join queries in these test cases, to
have tests cover the Flink join optimization logic?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]