This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 7658f77a613c [SPARK-39910][SQL] Delegate path qualification to
filesystem during DataSource file path globbing
7658f77a613c is described below
commit 7658f77a613c91364c4b6c986e1861c7bd5487db
Author: Tigran Manasyan <[email protected]>
AuthorDate: Thu Feb 8 20:29:09 2024 +0800
[SPARK-39910][SQL] Delegate path qualification to filesystem during
DataSource file path globbing
In current version `DataSource#checkAndGlobPathIfNecessary` qualifies paths
via `Path#makeQualified` and `PartitioningAwareFileIndex` qualifies via
`FileSystem#makeQualified`. Most `FileSystem` implementations simply delegate
to `Path#makeQualified`, but others, like `HarFileSystem` contain fs-specific
logic, that can produce different result. Such inconsistencies can lead to a
situation, when spark can't find partitions of the source file, because
qualified paths, built by `Path` and [...]
Allow users to read files from hadoop archives (.har) using DataFrameReader
API
No
New tests were added in `DataSourceSuite` and `DataFrameReaderWriterSuite`
No
Closes #43463 from tigrulya-exe/SPARK-39910-use-fs-path-qualification.
Authored-by: Tigran Manasyan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit b7edc5fac0f4e479cbc869d54a9490c553ba2613)
Signed-off-by: Wenchen Fan <[email protected]>
---
dev/.rat-excludes | 1 +
.../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +-
sql/core/src/test/resources/test-data/test-archive.har/_index | 2 ++
.../src/test/resources/test-data/test-archive.har/_masterindex | 2 ++
sql/core/src/test/resources/test-data/test-archive.har/part-0 | 3 +++
.../apache/spark/sql/execution/datasources/DataSourceSuite.scala | 4 ++++
.../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 8 ++++++++
7 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 16e0e3e30c9e..6bf840cee283 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -145,3 +145,4 @@ empty.proto
.*\.proto.bin
LimitedInputStream.java
TimSort.java
+.*\.har
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 94dd3bc0bd63..2e24087d507b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -722,7 +722,7 @@ object DataSource extends Logging {
val qualifiedPaths = pathStrings.map { pathString =>
val path = new Path(pathString)
val fs = path.getFileSystem(hadoopConf)
- path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ fs.makeQualified(path)
}
// Split the paths into glob and non glob paths, because we don't need to
do an existence check
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_index
b/sql/core/src/test/resources/test-data/test-archive.har/_index
new file mode 100644
index 000000000000..b7ae3ef9c5a4
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_index
@@ -0,0 +1,2 @@
+%2F dir 1707380620211+493+tigrulya+hadoop 0 0 test.csv
+%2Ftest.csv file part-0 0 6 1707380620197+420+tigrulya+hadoop
diff --git
a/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
new file mode 100644
index 000000000000..4192a9597299
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
@@ -0,0 +1,2 @@
+3
+0 1948547033 0 119
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/part-0
b/sql/core/src/test/resources/test-data/test-archive.har/part-0
new file mode 100644
index 000000000000..01e79c32a8c9
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/part-0
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
index 06e570cb016b..90b341ae1f2c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.net.URI
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.PrivateMethodTester
@@ -214,4 +216,6 @@ class MockFileSystem extends RawLocalFileSystem {
override def globStatus(pathPattern: Path): Array[FileStatus] = {
mockGlobResults.getOrElse(pathPattern, Array())
}
+
+ override def getUri: URI = URI.create("mockFs://mockFs/")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 17348fe2dcbb..b40f9210a686 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -1363,4 +1363,12 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSparkSession with
}
}
}
+
+ test("SPARK-39910: read files from Hadoop archives") {
+ val fileSchema = new StructType().add("str", StringType)
+ val harPath = testFile("test-data/test-archive.har")
+ .replaceFirst("file:/", "har:/")
+
+ testRead(spark.read.schema(fileSchema).csv(s"$harPath/test.csv"), data,
fileSchema)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]