This is an automated email from the ASF dual-hosted git repository.
Zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0f7b594b0f [spark] Reduce paimon-spark-4.0 shadows via SparkShim copy
factories (#7721)
0f7b594b0f is described below
commit 0f7b594b0f3cbaada70bc0e81c8526ef4e6c5b6f
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri May 22 11:13:06 2026 +0800
[spark] Reduce paimon-spark-4.0 shadows via SparkShim copy factories (#7721)
---
.../apache/paimon/spark/util/ScanPlanHelper.scala | 79 ----------------------
.../apache/paimon/spark/util/ScanPlanHelper.scala | 6 +-
2 files changed, 5 insertions(+), 80 deletions(-)
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
deleted file mode 100644
index 832291e379..0000000000
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.util
-
-import org.apache.paimon.spark.SparkTable
-import
org.apache.paimon.spark.schema.PaimonMetadataColumn.{PATH_AND_INDEX_META_COLUMNS,
ROW_TRACKING_META_COLUMNS}
-import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
-import org.apache.paimon.table.source.Split
-
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.functions.col
-
-trait ScanPlanHelper extends SQLConfHelper {
-
- /** Create a new scan plan from a relation with the given data splits,
condition(optional). */
- def createNewScanPlan(
- dataSplits: Seq[Split],
- relation: DataSourceV2Relation,
- condition: Option[Expression]): LogicalPlan = {
- val newRelation = createNewScanPlan(dataSplits, relation)
- condition match {
- case Some(c) if c != TrueLiteral => Filter(c, newRelation)
- case _ => newRelation
- }
- }
-
- def createNewScanPlan(
- dataSplits: Seq[Split],
- relation: DataSourceV2Relation): DataSourceV2Relation = {
- relation.table match {
- case sparkTable @ SparkTable(table: InnerTable) =>
- val knownSplitsTable = KnownSplitsTable.create(table,
dataSplits.toArray)
- relation.copy(table = sparkTable.copy(table = knownSplitsTable))
- case _ => throw new RuntimeException()
- }
- }
-
- def selectWithDvMeta(data: DataFrame): DataFrame = {
- selectWithAdditionalCols(data, PATH_AND_INDEX_META_COLUMNS)
- }
-
- def selectWithRowTracking(data: DataFrame): DataFrame = {
- selectWithAdditionalCols(data, ROW_TRACKING_META_COLUMNS)
- }
-
- private def selectWithAdditionalCols(data: DataFrame, additionalCols:
Seq[String]): DataFrame = {
- val dataColNames = data.schema.names
- val mergedColNames = dataColNames ++
additionalCols.filterNot(dataColNames.contains)
- data.select(mergedColNames.map(col): _*)
- }
-}
-
-/** This wrapper is only used in java code, e.g. Procedure. */
-object ScanPlanHelper extends ScanPlanHelper {
- def createNewScanPlan(dataSplits: Array[Split], relation:
DataSourceV2Relation): LogicalPlan = {
- ScanPlanHelper.createNewScanPlan(dataSplits.toSeq, relation)
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
index 832291e379..a26ae6d328 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -30,6 +30,7 @@ import
org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
trait ScanPlanHelper extends SQLConfHelper {
@@ -51,7 +52,10 @@ trait ScanPlanHelper extends SQLConfHelper {
relation.table match {
case sparkTable @ SparkTable(table: InnerTable) =>
val knownSplitsTable = KnownSplitsTable.create(table,
dataSplits.toArray)
- relation.copy(table = sparkTable.copy(table = knownSplitsTable))
+ SparkShimLoader.shim.copyDataSourceV2Relation(
+ relation,
+ sparkTable.copy(table = knownSplitsTable),
+ relation.output)
case _ => throw new RuntimeException()
}
}