This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d09d2aaec [spark] avoid empty PaimonInputPartition (#3754)
d09d2aaec is described below

commit d09d2aaec0ded34c3588f6f0cfb35296ce09b81e
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jul 16 09:51:33 2024 +0800

    [spark] avoid empty PaimonInputPartition (#3754)
---
 .../scala/org/apache/paimon/spark/ScanHelper.scala |  2 +-
 .../org/apache/paimon/spark/ScanHelperTest.scala   | 26 ++++++++++++++++++++--
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 4971de24c..a5d4f10ba 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -85,7 +85,7 @@ trait ScanHelper extends Logging {
 
     def closeInputPartition(): Unit = {
       closeDataSplit()
-      if (currentSplit.nonEmpty) {
+      if (currentSplits.nonEmpty) {
         partitions += PaimonInputPartition(currentSplits.toArray)
       }
       currentSplits.clear()
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index 1a844d998..637113930 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.junit.jupiter.api.Assertions
 
-import java.util.HashMap
+import java.util.{HashMap => JHashMap}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -65,9 +65,31 @@ class ScanHelperTest extends PaimonSparkTestBase {
     }
   }
 
+  test("Paimon: reshuffle one split") {
+    val files = List(
+      DataFileMeta.forAppend("f1.parquet", 750000, 30000, null, 0, 29999, 1, 
FileSource.APPEND)
+    ).asJava
+
+    val dataSplits: Array[Split] = Array(
+      DataSplit
+        .builder()
+        .withSnapshot(1)
+        .withBucket(0)
+        .withPartition(BinaryRow.EMPTY_ROW)
+        .withDataFiles(files)
+        .rawConvertible(true)
+        .withBucketPath("no use")
+        .build()
+    )
+
+    val fakeScan = new FakeScan()
+    val reshuffled = fakeScan.getInputPartitions(dataSplits)
+    Assertions.assertEquals(1, reshuffled.length)
+  }
+
   class FakeScan extends ScanHelper {
     override val coreOptions: CoreOptions =
-      CoreOptions.fromMap(new HashMap[String, String]())
+      CoreOptions.fromMap(new JHashMap[String, String]())
   }
 
 }

Reply via email to