This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fe298c3 [SPARK-31935][SQL][3.0] Hadoop file system config should be
effective in data source options
fe298c3 is described below
commit fe298c34e11823cb1371db1bff425ce8874fface
Author: Gengliang Wang <[email protected]>
AuthorDate: Thu Jun 11 14:18:19 2020 -0700
[SPARK-31935][SQL][3.0] Hadoop file system config should be effective in
data source options
### What changes were proposed in this pull request?
Mkae Hadoop file system config effective in data source options.
From `org.apache.hadoop.fs.FileSystem.java`:
```
public static FileSystem get(URI uri, Configuration conf) throws
IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches
default
&& defaultUri.getAuthority() != null) { // & default has
authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache",
scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
```
Before changes, the file system configurations in data source options are
not propagated in `DataSource.scala`.
After changes, we can specify authority and URI schema related
configurations for scanning file systems.
This problem only exists in data source V1. In V2, we already use
`sparkSession.sessionState.newHadoopConfWithOptions(options)` in `FileTable`.
### Why are the changes needed?
Allow users to specify authority and URI schema related Hadoop
configurations for file source reading.
### Does this PR introduce _any_ user-facing change?
Yes, the file system related Hadoop configuration in data source option
will be effective on reading.
### How was this patch tested?
Unit test
Closes #28776 from gengliangwang/SPARK-31935-3.0.
Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/datasources/DataSource.scala | 13 +++++++------
.../apache/spark/sql/FileBasedDataSourceSuite.scala | 20 ++++++++++++++++++++
.../spark/sql/streaming/FileStreamSourceSuite.scala | 12 ++++++++++++
3 files changed, 39 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3615afc..588a9b4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -109,6 +109,9 @@ case class DataSource(
private def providingInstance() =
providingClass.getConstructor().newInstance()
+ private def newHadoopConfiguration(): Configuration =
+ sparkSession.sessionState.newHadoopConfWithOptions(options)
+
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
@@ -230,7 +233,7 @@ case class DataSource(
// once the streaming job starts and some upstream source starts
dropping data.
val hdfsPath = new Path(path)
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
- val fs =
hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
}
@@ -357,7 +360,7 @@ case class DataSource(
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
- sparkSession.sessionState.newHadoopConf(),
+ newHadoopConfiguration(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++
paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
@@ -449,7 +452,7 @@ case class DataSource(
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
- val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be
specified, but " +
@@ -569,9 +572,7 @@ case class DataSource(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
- DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
+ DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq,
newHadoopConfiguration(),
checkEmptyGlobPath, checkFilesExist)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index cb410b4..d8157d3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
+ test("SPARK-31935: Hadoop file system config should be effective in data
source options") {
+ Seq("parquet", "").foreach { format =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ val defaultFs = "nonexistFS://nonexistFS"
+ val expectMessage = "No FileSystem for scheme nonexistFS"
+ val message1 = intercept[java.io.IOException] {
+ spark.range(10).write.option("fs.defaultFS",
defaultFs).parquet(path)
+ }.getMessage
+ assert(message1.filterNot(Set(':', '"').contains) == expectMessage)
+ val message2 = intercept[java.io.IOException] {
+ spark.read.option("fs.defaultFS", defaultFs).parquet(path)
+ }.getMessage
+ assert(message2.filterNot(Set(':', '"').contains) == expectMessage)
+ }
+ }
+ }
+ }
+
test("SPARK-31116: Select nested schema with case insensitive mode") {
// This test case failed at only Parquet. ORC is added for test coverage
parity.
Seq("orc", "parquet").foreach { format =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fa32033..7b16aeb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("SPARK-31935: Hadoop file system config should be effective in data
source options") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ val defaultFs = "nonexistFS://nonexistFS"
+ val expectMessage = "No FileSystem for scheme nonexistFS"
+ val message = intercept[java.io.IOException] {
+ spark.readStream.option("fs.defaultFS", defaultFs).text(path)
+ }.getMessage
+ assert(message.filterNot(Set(':', '"').contains) == expectMessage)
+ }
+ }
+
test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]