This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b8c3932a48 [spark] Refactor BaseScan and PaimonStatistics (#6833)
b8c3932a48 is described below

commit b8c3932a483dc43660f4211c83cbde4ced956b06
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Dec 19 10:12:45 2025 +0800

    [spark] Refactor BaseScan and PaimonStatistics (#6833)
---
 .../paimon/spark/PaimonFormatTableScan.scala       |   6 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../paimon/spark/statistics/StatisticsHelper.scala |  30 ------
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   8 +-
 .../paimon/spark/statistics/StatisticsHelper.scala |  30 ------
 .../paimon/spark/FormatTableStatistics.scala       |  57 -----------
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |  66 +++++--------
 .../paimon/spark/PaimonFormatTableBaseScan.scala   |  53 +++-------
 .../paimon/spark/PaimonPartitionReader.scala       |  12 +--
 .../scala/org/apache/paimon/spark/PaimonScan.scala |  27 ++----
 .../apache/paimon/spark/PaimonScanBuilder.scala    |   1 +
 .../spark/PaimonSparkCopyOnWriteOperation.scala    |   1 +
 .../apache/paimon/spark/PaimonSparkTableBase.scala |   1 +
 .../DisableUnnecessaryPaimonBucketedScan.scala     |   2 +-
 .../BaseScan.scala}                                |  58 ++++++++---
 .../BinPackingSplits.scala}                        |  13 ++-
 .../spark/{ => scan}/PaimonCopyOnWriteScan.scala   |  41 +++-----
 .../paimon/spark/{ => scan}/PaimonLocalScan.scala  |   2 +-
 .../paimon/spark/{ => scan}/PaimonSplitScan.scala  |  31 +-----
 .../paimon/spark/{ => scan}/PaimonStatistics.scala |  87 ++++++++++-------
 .../spark/statistics/StatisticsHelperBase.scala    | 108 ---------------------
 .../SplitUtils.scala}                              |  30 ++++--
 .../paimon/spark/write/BaseV2WriteBuilder.scala    |   2 +-
 .../apache/paimon/spark/write/PaimonV2Write.scala  |   4 +-
 ...HelperTest.scala => BinPackingSplitsTest.scala} |  22 ++---
 .../paimon/spark/sql/DeletionVectorTest.scala      |   3 +-
 .../apache/paimon/spark/sql/PaimonMetricTest.scala |   7 +-
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  24 ++---
 .../spark/sql/SparkV2FilterConverterTestBase.scala |   2 +-
 .../paimon/spark/util/ScanPlanHelperTest.scala     |   2 +-
 30 files changed, 235 insertions(+), 498 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index e9734d238b..467172401b 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -38,8 +38,7 @@ case class PaimonFormatTableScan(
     pushedDataFilters: Seq[Predicate],
     override val pushedLimit: Option[Int] = None)
   extends PaimonFormatTableBaseScan
-  with SupportsRuntimeFiltering
-  with ScanHelper {
+  with SupportsRuntimeFiltering {
 
   override def filterAttributes(): Array[NamedReference] = {
     val requiredFields = readBuilder.readType().getFieldNames.asScala
@@ -61,7 +60,8 @@ case class PaimonFormatTableScan(
     if (partitionFilter.nonEmpty) {
       readBuilder.withFilter(partitionFilter.head)
       // set inputPartitions null to trigger to get the new splits.
-      inputPartitions = null
+      _inputSplits = null
+      _inputPartitions = null
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 68b89d5ecc..e9eaa7d6cc 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -61,7 +61,8 @@ case class PaimonScan(
     if (partitionFilter.nonEmpty) {
       readBuilder.withFilter(partitionFilter.head)
       // set inputPartitions null to trigger to get the new splits.
-      inputPartitions = null
+      _inputSplits = null
+      _inputPartitions = null
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
deleted file mode 100644
index e64785ddee..0000000000
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.statistics
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-
-trait StatisticsHelper extends StatisticsHelperBase {
-  protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]): 
logical.Statistics = {
-    DataSourceV2Relation.transformV2Stats(v2Stats, None, 
conf.defaultSizeInBytes)
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 7c0a4d0c17..3afca15303 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -81,7 +81,7 @@ case class PaimonScan(
 
   /** Extract the bucket number from the splits only if all splits have the 
same totalBuckets number. */
   private def extractBucketNumber(): Option[Int] = {
-    val splits = getOriginSplits
+    val splits = inputSplits
     if (splits.exists(!_.isInstanceOf[DataSplit])) {
       None
     } else {
@@ -102,7 +102,7 @@ case class PaimonScan(
   // Since Spark 3.3
   override def outputPartitioning: Partitioning = {
     extractBucketTransform
-      .map(bucket => new KeyGroupedPartitioning(Array(bucket), 
lazyInputPartitions.size))
+      .map(bucket => new KeyGroupedPartitioning(Array(bucket), 
inputPartitions.size))
       .getOrElse(new UnknownPartitioning(0))
   }
 
@@ -142,8 +142,8 @@ case class PaimonScan(
     if (partitionFilter.nonEmpty) {
       readBuilder.withFilter(partitionFilter.head)
       // set inputPartitions null to trigger to get the new splits.
-      inputPartitions = null
-      inputSplits = null
+      _inputPartitions = null
+      _inputSplits = null
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
deleted file mode 100644
index e64785ddee..0000000000
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.statistics
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-
-trait StatisticsHelper extends StatisticsHelperBase {
-  protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]): 
logical.Statistics = {
-    DataSourceV2Relation.transformV2Stats(v2Stats, None, 
conf.defaultSizeInBytes)
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
deleted file mode 100644
index 8863a259ef..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.table.format.FormatDataSplit
-import org.apache.paimon.types.RowType
-
-import org.apache.spark.sql.connector.read.Statistics
-
-import java.util.OptionalLong
-
-import scala.collection.JavaConverters._
-
-case class FormatTableStatistics[T <: PaimonFormatTableBaseScan](scan: T) 
extends Statistics {
-
-  private lazy val fileTotalSize: Long =
-    scan.getOriginSplits
-      .map(_.asInstanceOf[FormatDataSplit])
-      .map(
-        split => {
-          if (split.length() != null) {
-            split.length().longValue()
-          } else {
-            split.fileSize()
-          }
-        })
-      .sum
-
-  override def sizeInBytes(): OptionalLong = {
-    val size = fileTotalSize /
-      estimateRowSize(scan.tableRowType) *
-      estimateRowSize(scan.readTableRowType)
-    OptionalLong.of(size)
-  }
-
-  private def estimateRowSize(rowType: RowType): Long = {
-    rowType.getFields.asScala.map(_.`type`().defaultSize().toLong).sum
-  }
-
-  override def numRows(): OptionalLong = OptionalLong.empty()
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index c0b6717269..7976ea6f63 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -20,44 +20,31 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.annotation.VisibleForTesting
 import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.spark.scan.BaseScan
 import org.apache.paimon.spark.sources.PaimonMicroBatchStream
 import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.stats
 import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{InnerTableScan, Split}
 
+import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
-import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, 
SupportsReportStatistics}
+import org.apache.spark.sql.connector.read.Batch
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
-import org.apache.spark.sql.types.StructType
-
-import java.util.Optional
 
 import scala.collection.JavaConverters._
 
-abstract class PaimonBaseScan(table: InnerTable)
-  extends Scan
-  with SupportsReportStatistics
-  with ScanHelper
-  with ColumnPruningAndPushDown {
-
-  protected var inputPartitions: Seq[PaimonInputPartition] = _
-
-  protected var inputSplits: Array[Split] = _
-
-  lazy val statistics: Optional[stats.Statistics] = table.statistics()
+abstract class PaimonBaseScan(table: InnerTable) extends BaseScan with 
SQLConfHelper {
 
   private lazy val paimonMetricsRegistry: SparkMetricRegistry = 
SparkMetricRegistry()
 
-  lazy val requiredStatsSchema: StructType = {
-    val fieldNames = readTableRowType.getFields.asScala.map(_.name)
-    StructType(tableSchema.filter(field => fieldNames.contains(field.name)))
-  }
+  // May recalculate the splits after executing runtime filter push down.
+  protected var _inputSplits: Array[Split] = _
+  protected var _inputPartitions: Seq[PaimonInputPartition] = _
 
   @VisibleForTesting
-  def getOriginSplits: Array[Split] = {
-    if (inputSplits == null) {
-      inputSplits = readBuilder
+  def inputSplits: Array[Split] = {
+    if (_inputSplits == null) {
+      _inputSplits = readBuilder
         .newScan()
         .asInstanceOf[InnerTableScan]
         .withMetricRegistry(paimonMetricsRegistry)
@@ -66,40 +53,33 @@ abstract class PaimonBaseScan(table: InnerTable)
         .asScala
         .toArray
     }
-    inputSplits
+    _inputSplits
   }
 
-  final def lazyInputPartitions: Seq[PaimonInputPartition] = {
-    if (inputPartitions == null) {
-      inputPartitions = getInputPartitions(getOriginSplits)
+  final override def inputPartitions: Seq[PaimonInputPartition] = {
+    if (_inputPartitions == null) {
+      _inputPartitions = getInputPartitions(inputSplits)
     }
-    inputPartitions
+    _inputPartitions
   }
 
   override def toBatch: Batch = {
     ensureNoFullScan()
-    PaimonBatch(lazyInputPartitions, readBuilder, 
coreOptions.blobAsDescriptor(), metadataColumns)
+    super.toBatch
   }
 
   override def toMicroBatchStream(checkpointLocation: String): 
MicroBatchStream = {
     new PaimonMicroBatchStream(table.asInstanceOf[DataTable], readBuilder, 
checkpointLocation)
   }
 
-  override def estimateStatistics(): Statistics = {
-    PaimonStatistics(this)
-  }
-
   override def supportedCustomMetrics: Array[CustomMetric] = {
-    Array(
-      PaimonNumSplitMetric(),
-      PaimonPartitionSizeMetric(),
-      PaimonReadBatchTimeMetric(),
-      PaimonPlanningDurationMetric(),
-      PaimonScannedSnapshotIdMetric(),
-      PaimonScannedManifestsMetric(),
-      PaimonSkippedTableFilesMetric(),
-      PaimonResultedTableFilesMetric()
-    )
+    super.supportedCustomMetrics ++
+      Array(
+        PaimonPlanningDurationMetric(),
+        PaimonScannedSnapshotIdMetric(),
+        PaimonScannedManifestsMetric(),
+        PaimonSkippedTableFilesMetric()
+      )
   }
 
   override def reportDriverMetrics(): Array[CustomTaskMetric] = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 0d84664639..707aee0948 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -18,63 +18,34 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.spark.scan.BaseScan
 import org.apache.paimon.table.FormatTable
 import org.apache.paimon.table.source.Split
 
-import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
-import org.apache.spark.sql.connector.read.{Batch, Statistics, 
SupportsReportStatistics}
-
 import scala.collection.JavaConverters._
 
 /** Base Scan implementation for [[FormatTable]]. */
-abstract class PaimonFormatTableBaseScan
-  extends ColumnPruningAndPushDown
-  with SupportsReportStatistics
-  with ScanHelper {
+abstract class PaimonFormatTableBaseScan extends BaseScan {
 
-  protected var inputSplits: Array[Split] = _
-  protected var inputPartitions: Seq[PaimonInputPartition] = _
+  protected var _inputSplits: Array[Split] = _
+  protected var _inputPartitions: Seq[PaimonInputPartition] = _
 
-  def getOriginSplits: Array[Split] = {
-    if (inputSplits == null) {
-      inputSplits = readBuilder
+  def inputSplits: Array[Split] = {
+    if (_inputSplits == null) {
+      _inputSplits = readBuilder
         .newScan()
         .plan()
         .splits()
         .asScala
         .toArray
     }
-    inputSplits
+    _inputSplits
   }
 
-  final def lazyInputPartitions: Seq[PaimonInputPartition] = {
-    if (inputPartitions == null) {
-      inputPartitions = getInputPartitions(getOriginSplits)
+  final override def inputPartitions: Seq[PaimonInputPartition] = {
+    if (_inputPartitions == null) {
+      _inputPartitions = getInputPartitions(inputSplits)
     }
-    inputPartitions
-  }
-
-  override def toBatch: Batch = {
-    PaimonBatch(lazyInputPartitions, readBuilder, 
coreOptions.blobAsDescriptor(), metadataColumns)
-  }
-
-  override def estimateStatistics(): Statistics = {
-    FormatTableStatistics(this)
-  }
-
-  override def supportedCustomMetrics: Array[CustomMetric] = {
-    Array(
-      PaimonNumSplitMetric(),
-      PaimonPartitionSizeMetric(),
-      PaimonReadBatchTimeMetric(),
-      PaimonResultedTableFilesMetric()
-    )
-  }
-
-  override def reportDriverMetrics(): Array[CustomTaskMetric] = {
-    val filesCount = getOriginSplits.length
-    Array(
-      PaimonResultedTableFilesTaskMetric(filesCount)
-    )
+    _inputPartitions
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 2040982687..fd8a6178c4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -23,8 +23,8 @@ import org.apache.paimon.disk.IOManager
 import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.spark.data.SparkInternalRow
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.format.FormatDataSplit
-import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
+import org.apache.paimon.spark.util.SplitUtils
+import org.apache.paimon.table.source.{ReadBuilder, Split}
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.catalyst.InternalRow
@@ -114,13 +114,7 @@ case class PaimonPartitionReader(
   // Partition metrics need to be computed only once.
   private lazy val partitionMetrics: Array[CustomTaskMetric] = {
     val numSplits = partition.splits.length
-    val splitSize = partition.splits.map {
-      case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
-      case fs: FormatDataSplit =>
-        if (fs.length() == null) fs.fileSize() else fs.length().longValue()
-      case _ => 0
-    }.sum
-
+    val splitSize = partition.splits.map(SplitUtils.splitSize).sum
     Array(
       PaimonNumSplitsTaskMetric(numSplits),
       PaimonPartitionSizeTaskMetric(splitSize)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index f8a6e89df8..06a97ee8b4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -42,8 +42,11 @@ case class PaimonScan(
     override val pushedLimit: Option[Int],
     override val pushedTopN: Option[TopN],
     bucketedScanDisabled: Boolean = false)
-  extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
+  extends PaimonBaseScan(table)
+  with SupportsReportPartitioning
+  with SupportsReportOrdering
   with SupportsRuntimeV2Filtering {
+
   def disableBucketedScan(): PaimonScan = {
     copy(bucketedScanDisabled = true)
   }
@@ -71,19 +74,10 @@ case class PaimonScan(
     if (partitionFilter.nonEmpty) {
       readBuilder.withFilter(partitionFilter.toList.asJava)
       // set inputPartitions null to trigger to get the new splits.
-      inputPartitions = null
-      inputSplits = null
+      _inputPartitions = null
+      _inputSplits = null
     }
   }
-}
-
-abstract class PaimonScanCommon(
-    table: InnerTable,
-    requiredSchema: StructType,
-    bucketedScanDisabled: Boolean = false)
-  extends PaimonBaseScan(table)
-  with SupportsReportPartitioning
-  with SupportsReportOrdering {
 
   @transient
   private lazy val extractBucketTransform: Option[Transform] = {
@@ -122,7 +116,7 @@ abstract class PaimonScanCommon(
 
   /** Extract the bucket number from the splits only if all splits have the 
same totalBuckets number. */
   private def extractBucketNumber(): Option[Int] = {
-    val splits = getOriginSplits
+    val splits = inputSplits
     if (splits.exists(!_.isInstanceOf[DataSplit])) {
       None
     } else {
@@ -143,15 +137,14 @@ abstract class PaimonScanCommon(
   // Since Spark 3.3
   override def outputPartitioning: Partitioning = {
     extractBucketTransform
-      .map(bucket => new KeyGroupedPartitioning(Array(bucket), 
lazyInputPartitions.size))
+      .map(bucket => new KeyGroupedPartitioning(Array(bucket), 
inputPartitions.size))
       .getOrElse(new UnknownPartitioning(0))
   }
 
   // Since Spark 3.4
   override def outputOrdering(): Array[SortOrder] = {
     if (
-      !shouldDoBucketedScan || lazyInputPartitions.exists(
-        !_.isInstanceOf[PaimonBucketedInputPartition])
+      !shouldDoBucketedScan || 
inputPartitions.exists(!_.isInstanceOf[PaimonBucketedInputPartition])
     ) {
       return Array.empty
     }
@@ -164,7 +157,7 @@ abstract class PaimonScanCommon(
       return Array.empty
     }
 
-    val allSplitsKeepOrdering = lazyInputPartitions.toSeq
+    val allSplitsKeepOrdering = inputPartitions.toSeq
       .map(_.asInstanceOf[PaimonBucketedInputPartition])
       .map(_.splits.asInstanceOf[Seq[DataSplit]])
       .forall {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index c61ce42b80..6eeaaf7b93 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate._
 import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
 import 
org.apache.paimon.spark.aggregate.AggregatePushDownUtils.tryPushdownAggregation
+import org.apache.paimon.spark.scan.PaimonLocalScan
 import org.apache.paimon.table.{FileStoreTable, InnerTable}
 
 import org.apache.spark.sql.connector.expressions
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
index 96256468e9..b939ba8ef5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.CoreOptions.BucketFunctionType
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.catalog.functions.BucketFunction
+import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
 import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.spark.write.PaimonV2WriteBuilder
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 22758dea0f..867de1109c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.CoreOptions.BucketFunctionType
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.catalog.functions.BucketFunction
+import org.apache.paimon.spark.scan.PaimonSplitScanBuilder
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
index db3c8fc692..b0101ded21 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
@@ -153,7 +153,7 @@ object DisableUnnecessaryPaimonBucketedScan extends 
Rule[SparkPlan] {
     plan match {
       case batch: BatchScanExec =>
         batch.scan match {
-          case scan: PaimonScan if scan.lazyInputPartitions.forall(_.bucketed) 
=>
+          case scan: PaimonScan if scan.inputPartitions.forall(_.bucketed) =>
             Some((batch, scan))
           case _ => None
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
similarity index 71%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
index 98daf2eaef..ffacda9853 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
@@ -16,24 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{Predicate, TopN}
+import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition, 
PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric, 
PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric, 
SparkTypeUtils}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
+import org.apache.paimon.spark.util.SplitUtils
 import org.apache.paimon.table.{SpecialFields, Table}
-import org.apache.paimon.table.source.ReadBuilder
+import org.apache.paimon.table.source.{ReadBuilder, Split}
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, 
SupportsReportStatistics}
 import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
 
-trait ColumnPruningAndPushDown extends Scan with Logging {
+/** Base scan. */
+trait BaseScan extends Scan with SupportsReportStatistics with Logging {
 
   def table: Table
 
@@ -46,6 +50,12 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
   def pushedLimit: Option[Int] = None
   def pushedTopN: Option[TopN] = None
 
+  // Input splits
+  def inputSplits: Array[Split]
+  def inputPartitions: Seq[PaimonInputPartition] = 
getInputPartitions(inputSplits)
+  def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] =
+    BinPackingSplits(coreOptions).pack(splits)
+
   val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
 
   lazy val tableRowType: RowType = {
@@ -56,12 +66,6 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
     }
   }
 
-  lazy val tableSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(tableRowType)
-
-  final def partitionType: StructType = {
-    SparkTypeUtils.toSparkPartitionType(table)
-  }
-
   private[paimon] val (readTableRowType, metadataFields) = {
     requiredSchema.fields.foreach(f => checkMetadataColumn(f.name))
     val (_requiredTableFields, _metadataFields) =
@@ -102,10 +106,6 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
     _readBuilder.dropStats()
   }
 
-  final def metadataColumns: Seq[PaimonMetadataColumn] = {
-    metadataFields.map(field => PaimonMetadataColumn.get(field.name, 
partitionType))
-  }
-
   override def readSchema(): StructType = {
     val _readSchema = StructType(
       SparkTypeUtils.fromPaimonRowType(readTableRowType).fields ++ 
metadataFields)
@@ -116,6 +116,36 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
     _readSchema
   }
 
+  override def toBatch: Batch = {
+    val metadataColumns = metadataFields.map(
+      field => PaimonMetadataColumn.get(field.name, 
SparkTypeUtils.toSparkPartitionType(table)))
+    PaimonBatch(inputPartitions, readBuilder, coreOptions.blobAsDescriptor(), 
metadataColumns)
+  }
+
+  def estimateStatistics: Statistics = {
+    PaimonStatistics(
+      inputSplits,
+      SparkTypeUtils.toPaimonRowType(readSchema()),
+      table.rowType(),
+      table.statistics())
+  }
+
+  override def supportedCustomMetrics: Array[CustomMetric] = {
+    Array(
+      PaimonNumSplitMetric(),
+      PaimonPartitionSizeMetric(),
+      PaimonReadBatchTimeMetric(),
+      PaimonResultedTableFilesMetric()
+    )
+  }
+
+  override def reportDriverMetrics(): Array[CustomTaskMetric] = {
+    val filesCount = inputSplits.map(SplitUtils.fileCount).sum
+    Array(
+      PaimonResultedTableFilesTaskMetric(filesCount)
+    )
+  }
+
   override def description(): String = {
     val pushedPartitionFiltersStr = if (pushedPartitionFilters.nonEmpty) {
       ", PartitionFilters: [" + pushedPartitionFilters.mkString(",") + "]"
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
similarity index 94%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index b14abebd64..f22bce3ec1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -16,11 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.CoreOptions._
 import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.spark.PaimonInputPartition
 import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
 import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
@@ -32,12 +33,10 @@ import org.apache.spark.sql.internal.SQLConf
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-trait ScanHelper extends SQLConfHelper with Logging {
+case class BinPackingSplits(coreOptions: CoreOptions) extends SQLConfHelper 
with Logging {
 
   private val spark = PaimonSparkSession.active
 
-  val coreOptions: CoreOptions
-
   private lazy val deletionVectors: Boolean = 
coreOptions.deletionVectorsEnabled()
 
   private lazy val filesMaxPartitionBytes: Long = {
@@ -72,7 +71,7 @@ trait ScanHelper extends SQLConfHelper with Logging {
       .getOrElse(spark.sparkContext.defaultParallelism)
   }
 
-  def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
+  def pack(splits: Array[Split]): Seq[PaimonInputPartition] = {
     val (toReshuffle, reserved) = splits.partition {
       case _: FallbackDataSplit => false
       case split: DataSplit => split.beforeFiles().isEmpty && 
split.rawConvertible()
@@ -80,7 +79,7 @@ trait ScanHelper extends SQLConfHelper with Logging {
     }
     if (toReshuffle.nonEmpty) {
       val startTS = System.currentTimeMillis()
-      val reshuffled = getInputPartitions(toReshuffle.collect { case ds: 
DataSplit => ds })
+      val reshuffled = packDataSplit(toReshuffle.collect { case ds: DataSplit 
=> ds })
       val all = reserved.map(PaimonInputPartition.apply) ++ reshuffled
       val duration = System.currentTimeMillis() - startTS
       logInfo(
@@ -92,7 +91,7 @@ trait ScanHelper extends SQLConfHelper with Logging {
     }
   }
 
-  private def getInputPartitions(splits: Array[DataSplit]): 
Array[PaimonInputPartition] = {
+  private def packDataSplit(splits: Array[DataSplit]): 
Array[PaimonInputPartition] = {
     val maxSplitBytes = computeMaxSplitBytes(splits)
 
     var currentSize = 0L
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
similarity index 77%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
index 7e45f4be1b..6ee1ba38df 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
@@ -16,10 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
 
 import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.spark.PaimonBatch
 import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
 import org.apache.paimon.table.{FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
@@ -40,21 +41,15 @@ case class PaimonCopyOnWriteScan(
     table: InnerTable,
     requiredSchema: StructType,
     pushedPartitionFilters: Seq[PartitionPredicate],
-    pushedDataFilters: Seq[Predicate],
-    bucketedScanDisabled: Boolean = false)
-  extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
+    pushedDataFilters: Seq[Predicate])
+  extends BaseScan
   with SupportsRuntimeV2Filtering {
 
-  var filteredLocations: mutable.Set[String] = mutable.Set[String]()
-
-  var filteredFileNames: mutable.Set[String] = mutable.Set[String]()
+  override def inputSplits: Array[Split] = 
dataSplits.asInstanceOf[Array[Split]]
+  private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
 
   var dataSplits: Array[DataSplit] = Array()
 
-  def disableBucketedScan(): PaimonCopyOnWriteScan = {
-    copy(bucketedScanDisabled = true)
-  }
-
   override def filterAttributes(): Array[NamedReference] = {
     Array(Expressions.column(FILE_PATH_COLUMN))
   }
@@ -66,7 +61,6 @@ case class PaimonCopyOnWriteScan(
         case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
           for (value <- in.values) {
             val location = value.asInstanceOf[String]
-            filteredLocations.add(location)
             filteredFileNames.add(Paths.get(location).getFileName.toString)
           }
         case _ => logWarning("Unsupported runtime filter")
@@ -79,26 +73,17 @@ case class PaimonCopyOnWriteScan(
         if (fileStoreTable.coreOptions().manifestDeleteFileDropStats()) {
           snapshotReader.dropStats()
         }
-
-        pushedPartitionFilters.foreach(snapshotReader.withPartitionFilter)
-
-        pushedDataFilters.foreach(snapshotReader.withFilter)
-
+        if (pushedPartitionFilters.nonEmpty) {
+          
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
+        }
+        if (pushedDataFilters.nonEmpty) {
+          
snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
+        }
         snapshotReader.withDataFileNameFilter(fileName => 
filteredFileNames.contains(fileName))
-
         dataSplits =
           snapshotReader.read().splits().asScala.collect { case s: DataSplit 
=> s }.toArray
 
       case _ => throw new RuntimeException("Only FileStoreTable support.")
     }
-
-  }
-
-  override def toBatch: Batch = {
-    PaimonBatch(
-      getInputPartitions(dataSplits.asInstanceOf[Array[Split]]),
-      readBuilder,
-      coreOptions.blobAsDescriptor(),
-      metadataColumns)
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonLocalScan.scala
similarity index 97%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonLocalScan.scala
index b4f8b3b785..7218e9e226 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonLocalScan.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
 
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.table.Table
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonSplitScan.scala
similarity index 67%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonSplitScan.scala
index f5998ff73b..87185264f4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonSplitScan.scala
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
 
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.spark.{PaimonBaseScanBuilder, PaimonBatch}
 import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
-import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read.{Batch, Scan}
 import org.apache.spark.sql.types.StructType
 
@@ -46,30 +46,7 @@ case class PaimonSplitScan(
     requiredSchema: StructType,
     pushedPartitionFilters: Seq[PartitionPredicate],
     pushedDataFilters: Seq[Predicate])
-  extends ColumnPruningAndPushDown
-  with ScanHelper {
+  extends BaseScan {
 
-  override def toBatch: Batch = {
-    PaimonBatch(
-      getInputPartitions(dataSplits.asInstanceOf[Array[Split]]),
-      readBuilder,
-      coreOptions.blobAsDescriptor(),
-      metadataColumns)
-  }
-
-  override def supportedCustomMetrics: Array[CustomMetric] = {
-    Array(
-      PaimonNumSplitMetric(),
-      PaimonPartitionSizeMetric(),
-      PaimonReadBatchTimeMetric(),
-      PaimonResultedTableFilesMetric()
-    )
-  }
-
-  override def reportDriverMetrics(): Array[CustomTaskMetric] = {
-    val filesCount = dataSplits.map(_.dataFiles().size).sum
-    Array(
-      PaimonResultedTableFilesTaskMetric(filesCount)
-    )
-  }
+  override def inputSplits: Array[Split] = 
dataSplits.asInstanceOf[Array[Split]]
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
similarity index 61%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
index cd8dcd1165..4ffd9ae64e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
@@ -16,11 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
 
+import org.apache.paimon.spark.DataConverter
+import org.apache.paimon.spark.util.SplitUtils
 import org.apache.paimon.stats
 import org.apache.paimon.stats.ColStats
-import org.apache.paimon.types.{DataField, DataType}
+import org.apache.paimon.table.source.Split
+import org.apache.paimon.types.{DataField, DataType, RowType}
 
 import org.apache.spark.sql.PaimonUtils
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
@@ -32,48 +35,64 @@ import java.util.{Optional, OptionalLong}
 
 import scala.collection.JavaConverters._
 
-case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics {
-
-  import PaimonImplicits._
+case class PaimonStatistics(
+    splits: Array[Split],
+    readRowType: RowType,
+    tableRowType: RowType,
+    paimonStats: Optional[stats.Statistics]
+) extends Statistics {
+
+  lazy val numRows: OptionalLong = {
+    if (splits.exists(_.rowCount() == -1)) {
+      OptionalLong.empty()
+    } else {
+      OptionalLong.of(splits.map(_.rowCount()).sum)
+    }
+  }
 
-  private lazy val paimonStats: Option[stats.Statistics] = scan.statistics
+  lazy val sizeInBytes: OptionalLong = {
+    if (numRows.isPresent) {
+      val sizeInBytes = numRows.getAsLong * estimateRowSize(readRowType)
+      // Avoid return 0 bytes if there are some valid rows.
+      // Avoid return too small size in bytes which may less than row count,
+      // note the compression ratio on disk is usually bigger than memory.
+      OptionalLong.of(Math.max(sizeInBytes, numRows.getAsLong))
+    } else {
+      val fileTotalSize = splits.map(SplitUtils.splitSize).sum
+      if (fileTotalSize == 0) {
+        OptionalLong.empty()
+      } else {
+        val size = (fileTotalSize * readRowSizeRatio).toLong
+        OptionalLong.of(size)
+      }
+    }
+  }
 
-  private lazy val rowCount: Long = 
scan.lazyInputPartitions.map(_.rowCount()).sum
+  lazy val readRowSizeRatio: Double = estimateRowSize(readRowType) / 
estimateRowSize(tableRowType)
 
-  private lazy val scannedTotalSize: Long = {
-    val readSchemaSize =
-      
SparkTypeUtils.toPaimonRowType(scan.readSchema()).getFields.asScala.map(getSizeForField).sum
-    val sizeInBytes = rowCount * readSchemaSize
-    // Avoid return 0 bytes if there are some valid rows.
-    // Avoid return too small size in bytes which may less than row count,
-    // note the compression ratio on disk is usually bigger than memory.
-    Math.max(sizeInBytes, rowCount)
+  private def estimateRowSize(rowType: RowType): Long = {
+    rowType.getFields.asScala.map(estimateFieldSize).sum
   }
 
-  private def getSizeForField(field: DataField): Long = {
-    paimonStats match {
-      case Some(stats) =>
-        val colStat = stats.colStats().get(field.name())
-        if (colStat != null && colStat.avgLen().isPresent) {
-          colStat.avgLen().getAsLong
-        } else {
-          field.`type`().defaultSize().toLong
-        }
-      case _ =>
+  private def estimateFieldSize(field: DataField): Long = {
+    if (paimonStats.isPresent) {
+      val colStat = paimonStats.get.colStats().get(field.name())
+      if (colStat != null && colStat.avgLen().isPresent) {
+        colStat.avgLen().getAsLong
+      } else {
         field.`type`().defaultSize().toLong
+      }
+    } else {
+      field.`type`().defaultSize().toLong
     }
   }
 
-  override def numRows(): OptionalLong = OptionalLong.of(rowCount)
-
-  override def sizeInBytes(): OptionalLong = OptionalLong.of(scannedTotalSize)
-
-  override def columnStats(): java.util.Map[NamedReference, ColumnStatistics] 
= {
-    val requiredFields = scan.requiredStatsSchema.fieldNames
+  override lazy val columnStats: java.util.Map[NamedReference, 
ColumnStatistics] = {
+    val requiredFields = readRowType.getFieldNames.asScala
     val resultMap = new java.util.HashMap[NamedReference, ColumnStatistics]()
-    if (paimonStats.isDefined) {
+    if (paimonStats.isPresent) {
       val paimonColStats = paimonStats.get.colStats()
-      scan.tableRowType.getFields.asScala
+      tableRowType.getFields.asScala
         .filter {
           field => requiredFields.contains(field.name) && 
paimonColStats.containsKey(field.name())
         }
@@ -113,7 +132,7 @@ object PaimonColumnStats {
   }
 
   def apply(v1ColStats: ColumnStat): PaimonColumnStats = {
-    import PaimonImplicits._
+    import org.apache.paimon.spark.PaimonImplicits._
     PaimonColumnStats(
       if (v1ColStats.nullCount.isDefined) 
OptionalLong.of(v1ColStats.nullCount.get.longValue)
       else OptionalLong.empty(),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
deleted file mode 100644
index 627c6a1688..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.statistics
-
-import org.apache.paimon.spark.PaimonColumnStats
-
-import org.apache.spark.sql.PaimonUtils
-import org.apache.spark.sql.catalyst.{SQLConfHelper, StructFilters}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BoundReference, Expression}
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.expressions.NamedReference
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.connector.read.colstats.ColumnStatistics
-import org.apache.spark.sql.sources.{And, Filter}
-import org.apache.spark.sql.types.StructType
-
-import java.util.OptionalLong
-
-trait StatisticsHelperBase extends SQLConfHelper {
-
-  val requiredStatsSchema: StructType
-
-  private lazy val replacedStatsSchema =
-    CharVarcharUtils.replaceCharVarcharWithStringInSchema(requiredStatsSchema)
-
-  def filterStatistics(v2Stats: Statistics, filters: Seq[Filter]): Statistics 
= {
-    val attrs: Seq[AttributeReference] =
-      replacedStatsSchema.map(f => AttributeReference(f.name, f.dataType, 
f.nullable, f.metadata)())
-    val condition = filterToCondition(filters, attrs)
-
-    if (condition.isDefined && v2Stats.numRows().isPresent) {
-      val filteredStats = FilterEstimation(
-        logical.Filter(condition.get, FakePlanWithStats(toV1Stats(v2Stats, 
attrs)))).estimate.get
-      toV2Stats(filteredStats)
-    } else {
-      v2Stats
-    }
-  }
-
-  private def filterToCondition(filters: Seq[Filter], attrs: Seq[Attribute]): 
Option[Expression] = {
-    StructFilters.filterToExpression(filters.reduce(And), toRef).map {
-      expression =>
-        expression.transform {
-          case ref: BoundReference =>
-            attrs.find(_.name == replacedStatsSchema(ref.ordinal).name).get
-        }
-    }
-  }
-
-  private def toRef(attr: String): Option[BoundReference] = {
-    val index = replacedStatsSchema.fieldIndex(attr)
-    val field = replacedStatsSchema(index)
-    Option.apply(BoundReference(index, field.dataType, field.nullable))
-  }
-
-  protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]): 
logical.Statistics
-
-  private def toV2Stats(v1Stats: logical.Statistics): Statistics = {
-    new Statistics() {
-      override def sizeInBytes(): OptionalLong = if (v1Stats.sizeInBytes != 
null)
-        OptionalLong.of(v1Stats.sizeInBytes.longValue)
-      else OptionalLong.empty()
-
-      override def numRows(): OptionalLong = if (v1Stats.rowCount.isDefined)
-        OptionalLong.of(v1Stats.rowCount.get.longValue)
-      else OptionalLong.empty()
-
-      override def columnStats(): java.util.Map[NamedReference, 
ColumnStatistics] = {
-        val columnStatsMap = new java.util.HashMap[NamedReference, 
ColumnStatistics]()
-        v1Stats.attributeStats.foreach {
-          case (attr, v1ColStats) =>
-            columnStatsMap.put(
-              PaimonUtils.fieldReference(attr.name),
-              PaimonColumnStats(v1ColStats)
-            )
-        }
-        columnStatsMap
-      }
-    }
-  }
-}
-
-case class FakePlanWithStats(v1Stats: logical.Statistics) extends LogicalPlan {
-  override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override protected def withNewChildrenInternal(
-      newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = throw new 
UnsupportedOperationException
-  override def stats: logical.Statistics = v1Stats
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
similarity index 55%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
index d54ac05fb4..e2c14845c6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
@@ -16,15 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.statistics
+package org.apache.paimon.spark.util
 
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.paimon.table.format.FormatDataSplit
+import org.apache.paimon.table.source.{DataSplit, Split}
 
-trait StatisticsHelper extends StatisticsHelperBase {
-  protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]): 
logical.Statistics = {
-    DataSourceV2Relation.transformV2Stats(v2Stats, None, 
conf.defaultSizeInBytes, attrs)
+import scala.collection.JavaConverters._
+
+object SplitUtils {
+
+  def splitSize(split: Split): Long = {
+    split match {
+      case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
+      case fs: FormatDataSplit =>
+        if (fs.length() == null) fs.fileSize() else fs.length().longValue()
+      case _ => 0
+    }
+  }
+
+  def fileCount(split: Split): Long = {
+    split match {
+      case ds: DataSplit => ds.dataFiles().size()
+      case _: FormatDataSplit => 1
+      case _ => 0
+    }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
index 17c8e9800f..dda04245cf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.write
 
-import org.apache.paimon.spark.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, 
SupportsOverwrite, WriteBuilder}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index ba89e84e2e..5a46e208f5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.commands.{SchemaHelper, SparkDataFileMeta}
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl, TableWriteImpl}
 import org.apache.paimon.table.source.DataSplit
@@ -216,11 +217,8 @@ private case class CopyOnWriteBatchWrite(
         batchTableCommit.truncateTable()
       } else {
         val touchedFiles = candidateFiles(scan.get.dataSplits)
-
         val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
-
         val addCommitMessages = WriteTaskResult.merge(messages)
-
         val commitMessages = addCommitMessages ++ deletedCommitMessage
 
         batchTableCommit.withMetricRegistry(metricRegistry)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
similarity index 87%
rename from 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
rename to 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
index cf0fdc91cd..72941b8c0d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.io.DataFileMeta
 import org.apache.paimon.manifest.FileSource
+import org.apache.paimon.spark.scan.BinPackingSplits
 import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.junit.jupiter.api.Assertions
@@ -31,7 +32,7 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-class ScanHelperTest extends PaimonSparkTestBase {
+class BinPackingSplitsTest extends PaimonSparkTestBase {
 
   test("Paimon: reshuffle splits") {
     withSparkSQLConf(("spark.sql.leafNodeDefaultParallelism", "20")) {
@@ -73,8 +74,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
             .build()
       }
 
-      val fakeScan = new FakeScan()
-      val reshuffled = fakeScan.getInputPartitions(dataSplits.toArray)
+      val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
+      val reshuffled = binPacking.pack(dataSplits.toArray)
       Assertions.assertTrue(reshuffled.length > 5)
     }
   }
@@ -110,8 +111,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
         .build()
     )
 
-    val fakeScan = new FakeScan()
-    val reshuffled = fakeScan.getInputPartitions(dataSplits)
+    val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
+    val reshuffled = binPacking.pack(dataSplits)
     Assertions.assertEquals(1, reshuffled.length)
   }
 
@@ -126,13 +127,13 @@ class ScanHelperTest extends PaimonSparkTestBase {
 
       // default openCostInBytes is 4m, so we will get 400 / 128 = 4 partitions
       withSparkSQLConf("spark.sql.leafNodeDefaultParallelism" -> "1") {
-        assert(paimonScan().lazyInputPartitions.length == 4)
+        assert(paimonScan().inputPartitions.length == 4)
       }
 
       withSparkSQLConf(
         "spark.sql.files.openCostInBytes" -> "0",
         "spark.sql.leafNodeDefaultParallelism" -> "1") {
-        assert(paimonScan().lazyInputPartitions.length == 1)
+        assert(paimonScan().inputPartitions.length == 1)
       }
 
       // Paimon's conf takes precedence over Spark's
@@ -140,13 +141,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
         "spark.sql.files.openCostInBytes" -> "4194304",
         "spark.paimon.source.split.open-file-cost" -> "0",
         "spark.sql.leafNodeDefaultParallelism" -> "1") {
-        assert(paimonScan().lazyInputPartitions.length == 1)
+        assert(paimonScan().inputPartitions.length == 1)
       }
     }
   }
-
-  class FakeScan extends ScanHelper {
-    override val coreOptions: CoreOptions =
-      CoreOptions.fromMap(new JHashMap[String, String]())
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 2eb9aa253e..1dedb2cfca 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -21,7 +21,8 @@ package org.apache.paimon.spark.sql
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.deletionvectors.{BucketedDvMaintainer, 
BucketedDvMaintainerTest, DeletionVector}
 import org.apache.paimon.fs.Path
-import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan}
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.scan.PaimonSplitScan
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.table.FileStoreTable
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index d776a00f93..b75ac07d7c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -18,8 +18,9 @@
 
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan}
 import org.apache.paimon.spark.PaimonMetrics.{RESULTED_TABLE_FILES, 
SCANNED_SNAPSHOT_ID, SKIPPED_TABLE_FILES}
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.scan.PaimonSplitScan
 import org.apache.paimon.spark.util.ScanPlanHelper
 import org.apache.paimon.table.source.DataSplit
 
@@ -52,7 +53,7 @@ class PaimonMetricTest extends PaimonSparkTestBase with 
ScanPlanHelper {
           resultedTableFiles: Long): Unit = {
         val scan = getPaimonScan(s)
         // call getInputPartitions to trigger scan
-        scan.lazyInputPartitions
+        scan.inputPartitions
         val metrics = scan.reportDriverMetrics()
         Assertions.assertEquals(scannedSnapshotId, metric(metrics, 
SCANNED_SNAPSHOT_ID))
         Assertions.assertEquals(skippedTableFiles, metric(metrics, 
SKIPPED_TABLE_FILES))
@@ -78,7 +79,7 @@ class PaimonMetricTest extends PaimonSparkTestBase with 
ScanPlanHelper {
       sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b')")
       sql(s"INSERT INTO T VALUES (3, 'c')")
 
-      val splits = getPaimonScan("SELECT * FROM 
T").getOriginSplits.map(_.asInstanceOf[DataSplit])
+      val splits = getPaimonScan("SELECT * FROM 
T").inputSplits.map(_.asInstanceOf[DataSplit])
       val df = createDataset(spark, createNewScanPlan(splits, 
createRelationV2("T")))
       val scan = df.queryExecution.optimizedPlan
         .collectFirst { case relation: DataSourceV2ScanRelation => relation }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index c23694e1d1..4ca4580aea 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -215,12 +215,12 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     val scanBuilder = getScanBuilder()
     Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
 
-    val dataSplitsWithoutLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithoutLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertTrue(dataSplitsWithoutLimit.length >= 2)
 
     // It still returns false even it can push down limit.
     
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
-    val dataSplitsWithLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertEquals(1, dataSplitsWithLimit.length)
 
     Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
@@ -240,7 +240,7 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
 
     // Case 1: All dataSplits is rawConvertible.
-    val dataSplitsWithoutLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithoutLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertEquals(4, dataSplitsWithoutLimit.length)
     // All dataSplits is rawConvertible.
     dataSplitsWithoutLimit.foreach(
@@ -250,19 +250,19 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
 
     // It still returns false even it can push down limit.
     
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
-    val dataSplitsWithLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertEquals(1, dataSplitsWithLimit.length)
     Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
 
     
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
-    val dataSplitsWithLimit1 = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithLimit1 = 
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertEquals(2, dataSplitsWithLimit1.length)
     Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
 
     // Case 2: Update 2 rawConvertible dataSplits to convert to 
nonRawConvertible.
     spark.sql("INSERT INTO T VALUES (1, 'a2', '11'), (2, 'b2', '22')")
     val scanBuilder2 = getScanBuilder()
-    val dataSplitsWithoutLimit2 = 
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithoutLimit2 = 
scanBuilder2.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertEquals(4, dataSplitsWithoutLimit2.length)
     // Now, we have 4 dataSplits, and 2 dataSplit is nonRawConvertible, 2 
dataSplit is rawConvertible.
     Assertions.assertEquals(
@@ -271,13 +271,13 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
 
     // Return 2 dataSplits.
     
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
-    val dataSplitsWithLimit2 = 
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithLimit2 = 
scanBuilder2.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertEquals(2, dataSplitsWithLimit2.length)
     Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
 
     // 2 dataSplits cannot meet the limit requirement, so need to scan all 
dataSplits.
     
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(3))
-    val dataSplitsWithLimit22 = 
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithLimit22 = 
scanBuilder2.build().asInstanceOf[PaimonScan].inputSplits
     // Need to scan all dataSplits.
     Assertions.assertEquals(4, dataSplitsWithLimit22.length)
     Assertions.assertEquals(3, spark.sql("SELECT * FROM T LIMIT 3").count())
@@ -285,7 +285,7 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     // Case 3: Update the remaining 2 rawConvertible dataSplits to make all 
dataSplits is nonRawConvertible.
     spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
     val scanBuilder3 = getScanBuilder()
-    val dataSplitsWithoutLimit3 = 
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithoutLimit3 = 
scanBuilder3.build().asInstanceOf[PaimonScan].inputSplits
     Assertions.assertEquals(4, dataSplitsWithoutLimit3.length)
 
     // All dataSplits is nonRawConvertible.
@@ -295,7 +295,7 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
       })
 
     
Assertions.assertFalse(scanBuilder3.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
-    val dataSplitsWithLimit3 = 
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+    val dataSplitsWithLimit3 = 
scanBuilder3.build().asInstanceOf[PaimonScan].inputSplits
     // Need to scan all dataSplits.
     Assertions.assertEquals(4, dataSplitsWithLimit3.length)
     Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
@@ -321,12 +321,12 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
               sql("DELETE FROM T WHERE id % 13 = 0")
               Assertions.assertEquals(100, spark.sql("SELECT * FROM T LIMIT 
100").count())
 
-              val withoutLimit = 
getScanBuilder().build().asInstanceOf[PaimonScan].getOriginSplits
+              val withoutLimit = 
getScanBuilder().build().asInstanceOf[PaimonScan].inputSplits
               assert(withoutLimit.length == 10)
 
               val scanBuilder = 
getScanBuilder().asInstanceOf[SupportsPushDownLimit]
               scanBuilder.pushLimit(1)
-              val withLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+              val withLimit = 
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
               if (deletionVectorsEnabled || !primaryKeyTable) {
                 assert(withLimit.length == 1)
               } else {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
index 08ceeceb3f..05704e3b36 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -364,7 +364,7 @@ abstract class SparkV2FilterConverterTestBase extends 
PaimonSparkTestBase {
   }
 
   private def scanFilesCount(str: String, tableName: String = "test_tbl"): Int 
= {
-    getPaimonScan(s"SELECT * FROM $tableName WHERE $str").lazyInputPartitions
+    getPaimonScan(s"SELECT * FROM $tableName WHERE $str").inputPartitions
       .flatMap(_.splits)
       .map(_.asInstanceOf[DataSplit].dataFiles().size())
       .sum
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
index df65a50095..fdb19684d4 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -31,7 +31,7 @@ class ScanPlanHelperTest extends PaimonSparkTestBase with 
ScanPlanHelper {
       sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
       sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
 
-      val splits = getPaimonScan("SELECT * FROM 
t").getOriginSplits.map(_.asInstanceOf[DataSplit])
+      val splits = getPaimonScan("SELECT * FROM 
t").inputSplits.map(_.asInstanceOf[DataSplit])
       val newScanPlan = createNewScanPlan(splits, createRelationV2("t"))
       val newDf = createDataset(spark, newScanPlan)
 

Reply via email to