This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aa19ef63b [spark] Reshuffle splits in PaimonSplitScan (#3429)
aa19ef63b is described below

commit aa19ef63b27434cdad90dace2988fa34b55bd29c
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 30 11:26:18 2024 +0800

    [spark] Reshuffle splits in PaimonSplitScan (#3429)
---
 .../scala/org/apache/paimon/spark/PaimonSplitScan.scala     | 13 ++++++++++---
 .../src/main/scala/org/apache/paimon/spark/ScanHelper.scala | 11 +++++++----
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 37991d0be..b4e95f087 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.table.Table
 import org.apache.paimon.table.source.{DataSplit, Split}
@@ -30,11 +31,17 @@ case class PaimonSplitScan(
     table: Table,
     dataSplits: Array[DataSplit],
     metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
-  extends Scan {
+  extends Scan
+  with ScanHelper {
+
+  override val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
+
   override def readSchema(): StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
 
   override def toBatch: Batch = {
-    PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder, 
metadataColumns)
+    PaimonBatch(
+      reshuffleSplits(dataSplits.asInstanceOf[Array[Split]]),
+      table.newReadBuilder,
+      metadataColumns)
   }
-
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 8074322ea..96c1078ed 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -20,14 +20,15 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.io.DataFileMeta
-import org.apache.paimon.table.source.{DataSplit, DeletionFile, RawFile, Split}
+import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-trait ScanHelper {
+trait ScanHelper extends Logging {
 
   private val spark = SparkSession.active
 
@@ -45,12 +46,15 @@ trait ScanHelper {
 
   def reshuffleSplits(splits: Array[Split]): Array[Split] = {
     if (splits.length < leafNodeDefaultParallelism) {
+      val beforeLength = splits.length
       val (toReshuffle, reserved) = splits.partition {
         case split: DataSplit => split.beforeFiles().isEmpty && 
split.convertToRawFiles.isPresent
         case _ => false
       }
       val reshuffled = reshuffleSplits0(toReshuffle.collect { case ds: 
DataSplit => ds })
-      reshuffled ++ reserved
+      val all = reshuffled ++ reserved
+      logInfo(s"Reshuffle splits from $beforeLength to ${all.length}")
+      all
     } else {
       splits
     }
@@ -134,5 +138,4 @@ trait ScanHelper {
 
     Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
   }
-
 }

Reply via email to