This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b7edc5fac0f4 [SPARK-39910][SQL] Delegate path qualification to 
filesystem during DataSource file path globbing
b7edc5fac0f4 is described below

commit b7edc5fac0f4e479cbc869d54a9490c553ba2613
Author: Tigran Manasyan <t.manas...@arenadata.io>
AuthorDate: Thu Feb 8 20:29:09 2024 +0800

    [SPARK-39910][SQL] Delegate path qualification to filesystem during 
DataSource file path globbing
    
    ### What changes were proposed in this pull request?
    
    In current version `DataSource#checkAndGlobPathIfNecessary` qualifies paths 
via `Path#makeQualified` and `PartitioningAwareFileIndex` qualifies via 
`FileSystem#makeQualified`. Most `FileSystem` implementations simply delegate 
to `Path#makeQualified`, but others, like `HarFileSystem` contain fs-specific 
logic, that can produce different result. Such inconsistencies can lead to a 
situation, when spark can't find partitions of the source file, because 
qualified paths, built by `Path` and [...]
    
    ### Why are the changes needed?
    
    Allow users to read files from hadoop archives (.har) using DataFrameReader 
API
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New tests were added in `DataSourceSuite` and `DataFrameReaderWriterSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43463 from tigrulya-exe/SPARK-39910-use-fs-path-qualification.
    
    Authored-by: Tigran Manasyan <t.manas...@arenadata.io>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 dev/.rat-excludes                                                 | 1 +
 .../org/apache/spark/sql/execution/datasources/DataSource.scala   | 2 +-
 sql/core/src/test/resources/test-data/test-archive.har/_index     | 2 ++
 .../src/test/resources/test-data/test-archive.har/_masterindex    | 2 ++
 sql/core/src/test/resources/test-data/test-archive.har/part-0     | 3 +++
 .../apache/spark/sql/execution/datasources/DataSourceSuite.scala  | 4 ++++
 .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala    | 8 ++++++++
 7 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 4cf5deb81192..8bad50951a78 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -138,3 +138,4 @@ people.xml
 ui-test/package.json
 ui-test/package-lock.json
 core/src/main/resources/org/apache/spark/ui/static/package.json
+.*\.har
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3e03fd652f18..837cfb0de092 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -760,7 +760,7 @@ object DataSource extends Logging {
     val qualifiedPaths = pathStrings.map { pathString =>
       val path = new Path(pathString)
       val fs = path.getFileSystem(hadoopConf)
-      path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+      fs.makeQualified(path)
     }
 
     // Split the paths into glob and non glob paths, because we don't need to 
do an existence check
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_index 
b/sql/core/src/test/resources/test-data/test-archive.har/_index
new file mode 100644
index 000000000000..b7ae3ef9c5a4
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_index
@@ -0,0 +1,2 @@
+%2F dir 1707380620211+493+tigrulya+hadoop 0 0 test.csv 
+%2Ftest.csv file part-0 0 6 1707380620197+420+tigrulya+hadoop 
diff --git 
a/sql/core/src/test/resources/test-data/test-archive.har/_masterindex 
b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
new file mode 100644
index 000000000000..4192a9597299
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
@@ -0,0 +1,2 @@
+3 
+0 1948547033 0 119 
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/part-0 
b/sql/core/src/test/resources/test-data/test-archive.har/part-0
new file mode 100644
index 000000000000..01e79c32a8c9
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/part-0
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
index 06e570cb016b..90b341ae1f2c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.net.URI
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.PrivateMethodTester
@@ -214,4 +216,6 @@ class MockFileSystem extends RawLocalFileSystem {
   override def globStatus(pathPattern: Path): Array[FileStatus] = {
     mockGlobResults.getOrElse(pathPattern, Array())
   }
+
+  override def getUri: URI = URI.create("mockFs://mockFs/")
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 4b97a0c6b7df..603ee74ce333 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -1363,4 +1363,12 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
       }
     }
   }
+
+  test("SPARK-39910: read files from Hadoop archives") {
+    val fileSchema = new StructType().add("str", StringType)
+    val harPath = testFile("test-data/test-archive.har")
+      .replaceFirst("file:/", "har:/")
+
+    testRead(spark.read.schema(fileSchema).csv(s"$harPath/test.csv"), data, 
fileSchema)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to