This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aa19ef63b [spark] Reshuffle splits in PaimonSplitScan (#3429)
aa19ef63b is described below
commit aa19ef63b27434cdad90dace2988fa34b55bd29c
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 30 11:26:18 2024 +0800
[spark] Reshuffle splits in PaimonSplitScan (#3429)
---
.../scala/org/apache/paimon/spark/PaimonSplitScan.scala | 13 ++++++++++---
.../src/main/scala/org/apache/paimon/spark/ScanHelper.scala | 11 +++++++----
2 files changed, 17 insertions(+), 7 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 37991d0be..b4e95f087 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -30,11 +31,17 @@ case class PaimonSplitScan(
table: Table,
dataSplits: Array[DataSplit],
metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
- extends Scan {
+ extends Scan
+ with ScanHelper {
+
+ override val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
+
override def readSchema(): StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
override def toBatch: Batch = {
- PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder,
metadataColumns)
+ PaimonBatch(
+ reshuffleSplits(dataSplits.asInstanceOf[Array[Split]]),
+ table.newReadBuilder,
+ metadataColumns)
}
-
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 8074322ea..96c1078ed 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -20,14 +20,15 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
import org.apache.paimon.io.DataFileMeta
-import org.apache.paimon.table.source.{DataSplit, DeletionFile, RawFile, Split}
+import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-trait ScanHelper {
+trait ScanHelper extends Logging {
private val spark = SparkSession.active
@@ -45,12 +46,15 @@ trait ScanHelper {
def reshuffleSplits(splits: Array[Split]): Array[Split] = {
if (splits.length < leafNodeDefaultParallelism) {
+ val beforeLength = splits.length
val (toReshuffle, reserved) = splits.partition {
case split: DataSplit => split.beforeFiles().isEmpty &&
split.convertToRawFiles.isPresent
case _ => false
}
val reshuffled = reshuffleSplits0(toReshuffle.collect { case ds:
DataSplit => ds })
- reshuffled ++ reserved
+ val all = reshuffled ++ reserved
+ logInfo(s"Reshuffle splits from $beforeLength to ${all.length}")
+ all
} else {
splits
}
@@ -134,5 +138,4 @@ trait ScanHelper {
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}
-
}