This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 8c6442fccfb [SPARK-43284] Switch back to url-encoded strings
8c6442fccfb is described below

commit 8c6442fccfbfb648d38af0621ddbfd8300d6f973
Author: David Lewis <[email protected]>
AuthorDate: Fri May 5 10:30:44 2023 +0800

    [SPARK-43284] Switch back to url-encoded strings
    
    Update `_metadata.file_path` and `_metadata.file_name` to return 
url-encoded strings, rather than un-encoded strings. This was a regression 
introduced in Spark 3.4.0.
    
    This was an inadvertent behavior change.
    
    Yes, fix regression!
    
    New test added to validate that the `file_path` and `path_name` are 
returned as encoded strings.
    
    Closes #40947 from databricks-david-lewis/SPARK-43284.
    
    Authored-by: David Lewis <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 1a777967ed109b0838793588c330a2e404627fb1)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/execution/datasources/FileFormat.scala     |  9 +++++--
 .../sql/execution/datasources/FileScanRDD.scala    |  7 +++--
 .../datasources/FileMetadataStructSuite.scala      | 31 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 3d7e2c8bf3e..decd0c30b00 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -255,8 +255,13 @@ object FileFormat {
       fileModificationTime: Long): InternalRow = {
     fieldNames.zipWithIndex.foreach { case (name, i) =>
       name match {
-        case FILE_PATH => row.update(i, 
UTF8String.fromString(filePath.toString))
-        case FILE_NAME => row.update(i, 
UTF8String.fromString(filePath.getName))
+        case FILE_PATH =>
+          // Use new Path(Path.toString).toString as a form of canonicalization
+          val pathString = new Path(filePath.toString).toString
+          row.update(i, UTF8String.fromString(pathString))
+        case FILE_NAME =>
+          val fileName = 
filePath.toUri.getRawPath.split("/").lastOption.getOrElse("")
+          row.update(i, UTF8String.fromString(fileName))
         case FILE_SIZE => row.update(i, fileSize)
         case FILE_BLOCK_START => row.update(i, fileBlockStart)
         case FILE_BLOCK_LENGTH => row.update(i, fileBlockLength)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 7fb2d9c8ac7..eebd92dad6e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -159,11 +159,14 @@ class FileScanRDD(
         metadataColumns.map(_.name).map {
           case FILE_PATH =>
             val columnVector = new ConstantColumnVector(c.numRows(), 
StringType)
-            columnVector.setUtf8String(UTF8String.fromString(path.toString))
+            // Use new Path(Path.toString).toString as a form of 
canonicalization
+            val pathString = new Path(path.toString).toString
+            columnVector.setUtf8String(UTF8String.fromString(pathString))
             columnVector
           case FILE_NAME =>
             val columnVector = new ConstantColumnVector(c.numRows(), 
StringType)
-            columnVector.setUtf8String(UTF8String.fromString(path.getName))
+            val fileName = 
path.toUri.getRawPath.split("/").lastOption.getOrElse("")
+            columnVector.setUtf8String(UTF8String.fromString(fileName))
             columnVector
           case FILE_SIZE =>
             val columnVector = new ConstantColumnVector(c.numRows(), LongType)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index f22181160a3..ee207f68e34 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -22,6 +22,7 @@ import java.sql.Timestamp
 import java.text.SimpleDateFormat
 
 import org.apache.spark.TestUtils
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.functions._
@@ -877,4 +878,34 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+
+  Seq("parquet", "json", "csv", "text", "orc").foreach { format =>
+    test(s"metadata file path and name are url encoded for format: $format") {
+      val suffix = if (format == "text") ".txt" else s".$format"
+      withTempPath { f =>
+        val dirWithSpace = s"$f/with space"
+        spark.range(10)
+          .selectExpr("cast(id as string) as str")
+          .repartition(1)
+          .write
+          .format(format)
+          .mode("append")
+          .save(dirWithSpace)
+
+        val pathWithSpace = s"$dirWithSpace/file with space.$suffix"
+        new File(dirWithSpace)
+          .listFiles((_, f) => f.endsWith(suffix))
+          .headOption
+          .getOrElse(fail(s"no file with suffix $suffix in $dirWithSpace"))
+          .renameTo(new File(pathWithSpace))
+
+        val encodedPath = SparkPath.fromPathString(pathWithSpace).urlEncoded
+        val encodedName = encodedPath.split("/").last
+        val df = spark.read.format(format).load(dirWithSpace)
+        val metadataName = df.select(METADATA_FILE_NAME).as[String].head()
+        assert(metadataName == encodedName)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to