This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fd6786f72 [core] Fix Limit push down sort splits violates its general
contract. (#4491)
fd6786f72 is described below
commit fd6786f7270bf45b099f5012dc17ad50b87f1c1a
Author: HunterXHunter <[email protected]>
AuthorDate: Sun Nov 10 23:17:04 2024 -0800
[core] Fix Limit push down sort splits violates its general contract.
(#4491)
---
.../paimon/table/source/DataTableBatchScan.java | 39 ++++-----------
.../paimon/spark/sql/PaimonPushDownTest.scala | 57 +++++++++++++++++++---
2 files changed, 61 insertions(+), 35 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index d1d455040..d3e8a2adb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -100,40 +100,21 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
return result;
}
- // We first add the rawConvertible split to avoid merging, and if
the row count
- // is still less than limit number, then add split which is not
rawConvertible.
- splits.sort(
- (x, y) -> {
- if (x.rawConvertible() && y.rawConvertible()) {
- return 0;
- } else if (x.rawConvertible()) {
- return -1;
- } else {
- return 1;
- }
- });
-
- // fast return if there is no rawConvertible split
- if (!splits.get(0).rawConvertible()) {
- return result;
- }
-
List<Split> limitedSplits = new ArrayList<>();
for (DataSplit dataSplit : splits) {
- long splitRowCount = getRowCountForSplit(dataSplit);
- limitedSplits.add(dataSplit);
- scannedRowCount += splitRowCount;
- if (scannedRowCount >= pushDownLimit) {
- break;
+ if (dataSplit.rawConvertible()) {
+ long splitRowCount = getRowCountForSplit(dataSplit);
+ limitedSplits.add(dataSplit);
+ scannedRowCount += splitRowCount;
+ if (scannedRowCount >= pushDownLimit) {
+ SnapshotReader.Plan newPlan =
+ new PlanImpl(plan.watermark(),
plan.snapshotId(), limitedSplits);
+ return new ScannedResult(newPlan);
+ }
}
}
-
- SnapshotReader.Plan newPlan =
- new PlanImpl(plan.watermark(), plan.snapshotId(),
limitedSplits);
- return new ScannedResult(newPlan);
- } else {
- return result;
}
+ return result;
}
/**
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
index 59968b555..ba314e3af 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
@@ -141,27 +141,72 @@ class PaimonPushDownTest extends PaimonSparkTestBase {
val scanBuilder = getScanBuilder()
Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
+ // Case 1: All dataSplits is rawConvertible.
val dataSplitsWithoutLimit =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit.length)
+ // All dataSplits is rawConvertible.
+ dataSplitsWithoutLimit.foreach(
+ splits => {
+ Assertions.assertTrue(splits.asInstanceOf[DataSplit].rawConvertible())
+ })
- // It still return false even it can push down limit.
+ // It still returns false even it can push down limit.
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
-
val dataSplitsWithLimit =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(1, dataSplitsWithLimit.length)
-
Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
- spark.sql("UPDATE T SET b = 'x' WHERE a = 1")
+
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
+ val dataSplitsWithLimit1 =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+ Assertions.assertEquals(2, dataSplitsWithLimit1.length)
+ Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
+ // Case 2: Update 2 rawConvertible dataSplits to convert to
nonRawConvertible.
+ spark.sql("INSERT INTO T VALUES (1, 'a2', '11'), (2, 'b2', '22')")
val scanBuilder2 = getScanBuilder()
val dataSplitsWithoutLimit2 =
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit2.length)
-
+ // Now, we have 4 dataSplits, and 2 dataSplit is nonRawConvertible, 2
dataSplit is rawConvertible.
+ Assertions.assertEquals(
+ 2,
+ dataSplitsWithoutLimit2
+ .filter(
+ split => {
+ split.asInstanceOf[DataSplit].rawConvertible()
+ })
+ .length)
+
+ // Return 2 dataSplits.
+
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
val dataSplitsWithLimit2 =
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
- Assertions.assertEquals(4, dataSplitsWithLimit2.length)
+ Assertions.assertEquals(2, dataSplitsWithLimit2.length)
+ Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
+ // 2 dataSplits cannot meet the limit requirement, so need to scan all
dataSplits.
+
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(3))
+ val dataSplitsWithLimit22 =
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+ // Need to scan all dataSplits.
+ Assertions.assertEquals(4, dataSplitsWithLimit22.length)
+ Assertions.assertEquals(3, spark.sql("SELECT * FROM T LIMIT 3").count())
+
+ // Case 3: Update the remaining 2 rawConvertible dataSplits to make all
dataSplits is nonRawConvertible.
+ spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
+ val scanBuilder3 = getScanBuilder()
+ val dataSplitsWithoutLimit3 =
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+ Assertions.assertEquals(4, dataSplitsWithoutLimit3.length)
+
+ // All dataSplits is nonRawConvertible.
+ dataSplitsWithoutLimit3.foreach(
+ splits => {
+ Assertions.assertFalse(splits.asInstanceOf[DataSplit].rawConvertible())
+ })
+
+
Assertions.assertFalse(scanBuilder3.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
+ val dataSplitsWithLimit3 =
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+ // Need to scan all dataSplits.
+ Assertions.assertEquals(4, dataSplitsWithLimit3.length)
Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
+
}
test("Paimon pushDown: runtime filter") {