Repository: spark
Updated Branches:
refs/heads/master c2c107aba -> 986a3b8b5
[SPARK-17796][SQL] Support wildcard character in filename for LOAD DATA LOCAL
INPATH
## What changes were proposed in this pull request?
Currently, Spark 2.0 raises an `input path does not exist` AnalysisException if
the file name contains '*'. It is misleading since it occurs when there exist
some matched files. Also, it was a supported feature in Spark 1.6.2. This PR
aims to support wildcard characters in filename for `LOAD DATA LOCAL INPATH`
SQL command like Spark 1.6.2.
**Reported Error Scenario**
```scala
scala> sql("CREATE TABLE t(a string)")
res0: org.apache.spark.sql.DataFrame = []
scala> sql("LOAD DATA LOCAL INPATH '/tmp/x*' INTO TABLE t")
org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist:
/tmp/x*;
```
## How was this patch tested?
Pass the Jenkins test with a new test case.
Author: Dongjoon Hyun <[email protected]>
Closes #15376 from dongjoon-hyun/SPARK-17796.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/986a3b8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/986a3b8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/986a3b8b
Branch: refs/heads/master
Commit: 986a3b8b5bedb1d64e2cf7c95bfdf5505f3e8c69
Parents: c2c107a
Author: Dongjoon Hyun <[email protected]>
Authored: Thu Oct 20 09:53:12 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Thu Oct 20 09:53:12 2016 +0100
----------------------------------------------------------------------
.../spark/sql/execution/command/tables.scala | 23 ++++++++++++++-
.../sql/hive/execution/SQLQuerySuite.scala | 30 ++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/986a3b8b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 403b479..4c0675a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command
import java.io.File
import java.net.URI
+import java.nio.file.FileSystems
import java.util.Date
import scala.collection.mutable.ArrayBuffer
@@ -245,7 +246,27 @@ case class LoadDataCommand(
val loadPath =
if (isLocal) {
val uri = Utils.resolveURI(path)
- if (!new File(uri.getPath()).exists()) {
+ val filePath = uri.getPath()
+ val exists = if (filePath.contains("*")) {
+ val fileSystem = FileSystems.getDefault
+ val pathPattern = fileSystem.getPath(filePath)
+ val dir = pathPattern.getParent.toString
+ if (dir.contains("*")) {
+ throw new AnalysisException(
+ s"LOAD DATA input path allows only filename wildcard: $path")
+ }
+
+ val files = new File(dir).listFiles()
+ if (files == null) {
+ false
+ } else {
+ val matcher = fileSystem.getPathMatcher("glob:" +
pathPattern.toAbsolutePath)
+ files.exists(f =>
matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
+ }
+ } else {
+ new File(filePath).exists()
+ }
+ if (!exists) {
throw new AnalysisException(s"LOAD DATA input path does not exist:
$path")
}
uri
http://git-wip-us.apache.org/repos/asf/spark/blob/986a3b8b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e26b6b5..495b4f8 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.hive.execution
+import java.io.{File, PrintWriter}
+import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try
+import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
@@ -1917,6 +1920,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils
with TestHiveSingleton {
}
}
+ test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL
INPATH") {
+ withTempDir { dir =>
+ for (i <- 1 to 3) {
+ Files.write(s"$i", new File(s"$dir/part-r-0000$i"),
StandardCharsets.UTF_8)
+ }
+ for (i <- 5 to 7) {
+ Files.write(s"$i", new File(s"$dir/part-s-0000$i"),
StandardCharsets.UTF_8)
+ }
+
+ withTable("load_t") {
+ sql("CREATE TABLE load_t (a STRING)")
+ sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t")
+ checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"),
Row("3")))
+
+ val m = intercept[AnalysisException] {
+ sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE
load_t")
+ }.getMessage
+ assert(m.contains("LOAD DATA input path does not exist"))
+
+ val m2 = intercept[AnalysisException] {
+ sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t")
+ }.getMessage
+ assert(m2.contains("LOAD DATA input path allows only filename
wildcard"))
+ }
+ }
+ }
+
def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]