Repository: spark
Updated Branches:
refs/heads/master 771abeb46 -> 3d2131ab4
[SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the
same shorten name
## What changes were proposed in this pull request?
One of the common usability problems around reading data in spark (particularly
CSV) is that there can often be a conflict between different readers in the
classpath.
As an example, if someone launches a 2.x spark shell with the spark-csv package
in the classpath, Spark currently fails in an extremely unfriendly way (see
databricks/spark-csv#367):
```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> val df = spark.read.csv("/foo/bar.csv")
java.lang.RuntimeException: Multiple sources found for csv
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), please specify the fully qualified
class name.
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574)
at
org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85)
at
org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
... 48 elided
```
This PR proposes a simple way of fixing this error by picking up the internal
datasource if there is single (the datasource that has "org.apache.spark"
prefix).
```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```
```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```
## How was this patch tested?
Manually tested as below:
```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
```
```scala
spark.sparkContext.setLogLevel("WARN")
```
**positive cases**:
```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```
```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```
(newlines were inserted for readability).
```scala
scala>
spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc")
```
```scala
scala>
spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc")
```
**negative cases**:
```scala
scala>
spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc")
java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation
...
```
```scala
scala>
spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc")
java.lang.ClassNotFoundException: Failed to find data source:
com.databricks.spark.csv.CsvRelatio. Please find packages at
http://spark.apache.org/third-party-projects.html
...
```
Author: hyukjinkwon <[email protected]>
Closes #17916 from HyukjinKwon/datasource-detect.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d2131ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d2131ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d2131ab
Branch: refs/heads/master
Commit: 3d2131ab4ddead29601fb3c597b798202ac25fdd
Parents: 771abeb
Author: hyukjinkwon <[email protected]>
Authored: Wed May 10 13:44:47 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed May 10 13:44:47 2017 +0800
----------------------------------------------------------------------
.../sql/execution/datasources/DataSource.scala | 19 ++++--
....apache.spark.sql.sources.DataSourceRegister | 4 ++
.../spark/sql/sources/DDLSourceLoadSuite.scala | 44 +++++++++++---
.../spark/sql/sources/fakeExternalSources.scala | 64 ++++++++++++++++++++
4 files changed, 117 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3d2131ab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f3b209d..bb7d1f7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -481,7 +481,7 @@ case class DataSource(
}
}
-object DataSource {
+object DataSource extends Logging {
/** A map to maintain backward compatibility in case we move data sources
around. */
private val backwardCompatibilityMap: Map[String, String] = {
@@ -570,10 +570,19 @@ object DataSource {
// there is exactly one registered alias
head.getClass
case sources =>
- // There are multiple registered aliases for the input
- sys.error(s"Multiple sources found for $provider1 " +
- s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
- "please specify the fully qualified class name.")
+ // There are multiple registered aliases for the input. If there is
single datasource
+ // that has "org.apache.spark" package in the prefix, we use it
considering it is an
+ // internal datasource within Spark.
+ val sourceNames = sources.map(_.getClass.getName)
+ val internalSources =
sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
+ if (internalSources.size == 1) {
+ logWarning(s"Multiple sources found for $provider1
(${sourceNames.mkString(", ")}), " +
+ s"defaulting to the internal datasource
(${internalSources.head.getClass.getName}).")
+ internalSources.head.getClass
+ } else {
+ throw new AnalysisException(s"Multiple sources found for
$provider1 " +
+ s"(${sourceNames.mkString(", ")}), please specify the fully
qualified class name.")
+ }
}
} catch {
case e: ServiceConfigurationError if
e.getCause.isInstanceOf[NoClassDefFoundError] =>
http://git-wip-us.apache.org/repos/asf/spark/blob/3d2131ab/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cfd7889..c6973bf 100644
---
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,3 +1,7 @@
org.apache.spark.sql.sources.FakeSourceOne
org.apache.spark.sql.sources.FakeSourceTwo
org.apache.spark.sql.sources.FakeSourceThree
+org.apache.spark.sql.sources.FakeSourceFour
+org.apache.fakesource.FakeExternalSourceOne
+org.apache.fakesource.FakeExternalSourceTwo
+org.apache.fakesource.FakeExternalSourceThree
http://git-wip-us.apache.org/repos/asf/spark/blob/3d2131ab/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index 85ba33e..b5fb740 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -19,26 +19,39 @@ package org.apache.spark.sql.sources
import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
// please note that the META-INF/services had to be modified for the test
directory for this to work
class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
- test("data sources with the same name") {
- intercept[RuntimeException] {
+ test("data sources with the same name - internal data sources") {
+ val e = intercept[AnalysisException] {
spark.read.format("Fluet da Bomb").load()
}
+ assert(e.getMessage.contains("Multiple sources found for Fluet da Bomb"))
+ }
+
+ test("data sources with the same name - internal data source/external data
source") {
+ assert(spark.read.format("datasource").load().schema ==
+ StructType(Seq(StructField("longType", LongType, nullable = false))))
+ }
+
+ test("data sources with the same name - external data sources") {
+ val e = intercept[AnalysisException] {
+ spark.read.format("Fake external source").load()
+ }
+ assert(e.getMessage.contains("Multiple sources found for Fake external
source"))
}
test("load data source from format alias") {
- spark.read.format("gathering quorum").load().schema ==
- StructType(Seq(StructField("stringType", StringType, nullable = false)))
+ assert(spark.read.format("gathering quorum").load().schema ==
+ StructType(Seq(StructField("stringType", StringType, nullable = false))))
}
test("specify full classname with duplicate formats") {
- spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
- .load().schema == StructType(Seq(StructField("stringType", StringType,
nullable = false)))
+ assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
+ .load().schema == StructType(Seq(StructField("stringType", StringType,
nullable = false))))
}
test("should fail to load ORC without Hive Support") {
@@ -63,7 +76,7 @@ class FakeSourceOne extends RelationProvider with
DataSourceRegister {
}
}
-class FakeSourceTwo extends RelationProvider with DataSourceRegister {
+class FakeSourceTwo extends RelationProvider with DataSourceRegister {
def shortName(): String = "Fluet da Bomb"
@@ -72,7 +85,7 @@ class FakeSourceTwo extends RelationProvider with
DataSourceRegister {
override def sqlContext: SQLContext = cont
override def schema: StructType =
- StructType(Seq(StructField("stringType", StringType, nullable =
false)))
+ StructType(Seq(StructField("integerType", IntegerType, nullable =
false)))
}
}
@@ -88,3 +101,16 @@ class FakeSourceThree extends RelationProvider with
DataSourceRegister {
StructType(Seq(StructField("stringType", StringType, nullable =
false)))
}
}
+
+class FakeSourceFour extends RelationProvider with DataSourceRegister {
+
+ def shortName(): String = "datasource"
+
+ override def createRelation(cont: SQLContext, param: Map[String, String]):
BaseRelation =
+ new BaseRelation {
+ override def sqlContext: SQLContext = cont
+
+ override def schema: StructType =
+ StructType(Seq(StructField("longType", LongType, nullable = false)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/3d2131ab/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
new file mode 100644
index 0000000..0dfd75e
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.fakesource
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister,
RelationProvider}
+import org.apache.spark.sql.types._
+
+
+// Note that the package name is intendedly mismatched in order to resemble
external data sources
+// and test the detection for them.
+class FakeExternalSourceOne extends RelationProvider with DataSourceRegister {
+
+ def shortName(): String = "Fake external source"
+
+ override def createRelation(cont: SQLContext, param: Map[String, String]):
BaseRelation =
+ new BaseRelation {
+ override def sqlContext: SQLContext = cont
+
+ override def schema: StructType =
+ StructType(Seq(StructField("stringType", StringType, nullable =
false)))
+ }
+}
+
+class FakeExternalSourceTwo extends RelationProvider with DataSourceRegister {
+
+ def shortName(): String = "Fake external source"
+
+ override def createRelation(cont: SQLContext, param: Map[String, String]):
BaseRelation =
+ new BaseRelation {
+ override def sqlContext: SQLContext = cont
+
+ override def schema: StructType =
+ StructType(Seq(StructField("integerType", IntegerType, nullable =
false)))
+ }
+}
+
+class FakeExternalSourceThree extends RelationProvider with DataSourceRegister
{
+
+ def shortName(): String = "datasource"
+
+ override def createRelation(cont: SQLContext, param: Map[String, String]):
BaseRelation =
+ new BaseRelation {
+ override def sqlContext: SQLContext = cont
+
+ override def schema: StructType =
+ StructType(Seq(StructField("byteType", ByteType, nullable = false)))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]