This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b7edc5fac0f4 [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing b7edc5fac0f4 is described below commit b7edc5fac0f4e479cbc869d54a9490c553ba2613 Author: Tigran Manasyan <t.manas...@arenadata.io> AuthorDate: Thu Feb 8 20:29:09 2024 +0800 [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing ### What changes were proposed in this pull request? In current version `DataSource#checkAndGlobPathIfNecessary` qualifies paths via `Path#makeQualified` and `PartitioningAwareFileIndex` qualifies via `FileSystem#makeQualified`. Most `FileSystem` implementations simply delegate to `Path#makeQualified`, but others, like `HarFileSystem` contain fs-specific logic, that can produce different result. Such inconsistencies can lead to a situation, when spark can't find partitions of the source file, because qualified paths, built by `Path` and [...] ### Why are the changes needed? Allow users to read files from hadoop archives (.har) using DataFrameReader API ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New tests were added in `DataSourceSuite` and `DataFrameReaderWriterSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #43463 from tigrulya-exe/SPARK-39910-use-fs-path-qualification. Authored-by: Tigran Manasyan <t.manas...@arenadata.io> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- dev/.rat-excludes | 1 + .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- sql/core/src/test/resources/test-data/test-archive.har/_index | 2 ++ .../src/test/resources/test-data/test-archive.har/_masterindex | 2 ++ sql/core/src/test/resources/test-data/test-archive.har/part-0 | 3 +++ .../apache/spark/sql/execution/datasources/DataSourceSuite.scala | 4 ++++ .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 8 ++++++++ 7 files changed, 21 insertions(+), 1 deletion(-) diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 4cf5deb81192..8bad50951a78 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -138,3 +138,4 @@ people.xml ui-test/package.json ui-test/package-lock.json core/src/main/resources/org/apache/spark/ui/static/package.json +.*\.har diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3e03fd652f18..837cfb0de092 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -760,7 +760,7 @@ object DataSource extends Logging { val qualifiedPaths = pathStrings.map { pathString => val path = new Path(pathString) val fs = path.getFileSystem(hadoopConf) - path.makeQualified(fs.getUri, fs.getWorkingDirectory) + fs.makeQualified(path) } // Split the paths into glob and non glob paths, because we don't need to do an existence check diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_index b/sql/core/src/test/resources/test-data/test-archive.har/_index new file mode 100644 index 000000000000..b7ae3ef9c5a4 --- /dev/null +++ b/sql/core/src/test/resources/test-data/test-archive.har/_index @@ -0,0 +1,2 @@ +%2F dir 1707380620211+493+tigrulya+hadoop 0 0 test.csv +%2Ftest.csv file part-0 0 6 1707380620197+420+tigrulya+hadoop diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_masterindex b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex new file mode 100644 index 000000000000..4192a9597299 --- /dev/null +++ b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex @@ -0,0 +1,2 @@ +3 +0 1948547033 0 119 diff --git a/sql/core/src/test/resources/test-data/test-archive.har/part-0 b/sql/core/src/test/resources/test-data/test-archive.har/part-0 new file mode 100644 index 000000000000..01e79c32a8c9 --- /dev/null +++ b/sql/core/src/test/resources/test-data/test-archive.har/part-0 @@ -0,0 +1,3 @@ +1 +2 +3 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 06e570cb016b..90b341ae1f2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.PrivateMethodTester @@ -214,4 +216,6 @@ class MockFileSystem extends RawLocalFileSystem { override def globStatus(pathPattern: Path): Array[FileStatus] = { mockGlobResults.getOrElse(pathPattern, Array()) } + + override def getUri: URI = URI.create("mockFs://mockFs/") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 4b97a0c6b7df..603ee74ce333 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -1363,4 +1363,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } } + + test("SPARK-39910: read files from Hadoop archives") { + val fileSchema = new StructType().add("str", StringType) + val harPath = testFile("test-data/test-archive.har") + .replaceFirst("file:/", "har:/") + + testRead(spark.read.schema(fileSchema).csv(s"$harPath/test.csv"), data, fileSchema) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org