This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d09d2aaec [spark] avoid empty PaimonInputPartition (#3754)
d09d2aaec is described below
commit d09d2aaec0ded34c3588f6f0cfb35296ce09b81e
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jul 16 09:51:33 2024 +0800
[spark] avoid empty PaimonInputPartition (#3754)
---
.../scala/org/apache/paimon/spark/ScanHelper.scala | 2 +-
.../org/apache/paimon/spark/ScanHelperTest.scala | 26 ++++++++++++++++++++--
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 4971de24c..a5d4f10ba 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -85,7 +85,7 @@ trait ScanHelper extends Logging {
def closeInputPartition(): Unit = {
closeDataSplit()
- if (currentSplit.nonEmpty) {
+ if (currentSplits.nonEmpty) {
partitions += PaimonInputPartition(currentSplits.toArray)
}
currentSplits.clear()
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index 1a844d998..637113930 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.table.source.{DataSplit, Split}
import org.junit.jupiter.api.Assertions
-import java.util.HashMap
+import java.util.{HashMap => JHashMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -65,9 +65,31 @@ class ScanHelperTest extends PaimonSparkTestBase {
}
}
+ test("Paimon: reshuffle one split") {
+ val files = List(
+ DataFileMeta.forAppend("f1.parquet", 750000, 30000, null, 0, 29999, 1,
FileSource.APPEND)
+ ).asJava
+
+ val dataSplits: Array[Split] = Array(
+ DataSplit
+ .builder()
+ .withSnapshot(1)
+ .withBucket(0)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withDataFiles(files)
+ .rawConvertible(true)
+ .withBucketPath("no use")
+ .build()
+ )
+
+ val fakeScan = new FakeScan()
+ val reshuffled = fakeScan.getInputPartitions(dataSplits)
+ Assertions.assertEquals(1, reshuffled.length)
+ }
+
class FakeScan extends ScanHelper {
override val coreOptions: CoreOptions =
- CoreOptions.fromMap(new HashMap[String, String]())
+ CoreOptions.fromMap(new JHashMap[String, String]())
}
}