This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b510fa858 [spark] use metadata column to replace inpute_file_name udf 
(#3412)
b510fa858 is described below

commit b510fa858f37c74187753fef79ef300376fdba1c
Author: Yann Byron <[email protected]>
AuthorDate: Wed May 29 21:38:44 2024 +0800

    [spark] use metadata column to replace inpute_file_name udf (#3412)
---
 .../org/apache/paimon/spark/commands/PaimonCommand.scala   | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 915def57d..f729c290c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
Expression}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => 
FilterLogicalNode, Project}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.functions.input_file_name
 import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
 
 import java.net.URI
@@ -118,7 +117,6 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
       sparkSession: SparkSession): Array[String] = {
     import sparkSession.implicits._
 
-    // only raw convertible can generate input_file_name()
     for (split <- candidateDataSplits) {
       if (!split.rawConvertible()) {
         throw new IllegalArgumentException(
@@ -126,13 +124,17 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
       }
     }
 
-    val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
-    val filteredRelation =
+    val metadataCols = Seq(FILE_PATH)
+    val metadataProj = metadataCols.map(_.toAttribute)
+    val newRelation = relation.copy(output = relation.output ++ metadataProj)
+    val scan = PaimonSplitScan(table, candidateDataSplits.toArray, 
metadataCols)
+    val filteredRelation = Project(
+      metadataProj,
       FilterLogicalNode(
         condition,
-        Compatibility.createDataSourceV2ScanRelation(relation, scan, 
relation.output))
+        Compatibility.createDataSourceV2ScanRelation(newRelation, scan, 
newRelation.output)))
     createDataset(sparkSession, filteredRelation)
-      .select(input_file_name())
+      .select(FILE_PATH_COLUMN)
       .distinct()
       .as[String]
       .collect()

Reply via email to