This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b510fa858 [spark] use metadata column to replace inpute_file_name udf
(#3412)
b510fa858 is described below
commit b510fa858f37c74187753fef79ef300376fdba1c
Author: Yann Byron <[email protected]>
AuthorDate: Wed May 29 21:38:44 2024 +0800
[spark] use metadata column to replace inpute_file_name udf (#3412)
---
.../org/apache/paimon/spark/commands/PaimonCommand.scala | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 915def57d..f729c290c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
Expression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
import java.net.URI
@@ -118,7 +117,6 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
sparkSession: SparkSession): Array[String] = {
import sparkSession.implicits._
- // only raw convertible can generate input_file_name()
for (split <- candidateDataSplits) {
if (!split.rawConvertible()) {
throw new IllegalArgumentException(
@@ -126,13 +124,17 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
}
}
- val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
- val filteredRelation =
+ val metadataCols = Seq(FILE_PATH)
+ val metadataProj = metadataCols.map(_.toAttribute)
+ val newRelation = relation.copy(output = relation.output ++ metadataProj)
+ val scan = PaimonSplitScan(table, candidateDataSplits.toArray,
metadataCols)
+ val filteredRelation = Project(
+ metadataProj,
FilterLogicalNode(
condition,
- Compatibility.createDataSourceV2ScanRelation(relation, scan,
relation.output))
+ Compatibility.createDataSourceV2ScanRelation(newRelation, scan,
newRelation.output)))
createDataset(sparkSession, filteredRelation)
- .select(input_file_name())
+ .select(FILE_PATH_COLUMN)
.distinct()
.as[String]
.collect()