Repository: spark
Updated Branches:
  refs/heads/branch-2.2 6a996b362 -> 7b6f3a118


[SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the 
same shorten name

## What changes were proposed in this pull request?

One of the common usability problems around reading data in spark (particularly 
CSV) is that there can often be a conflict between different readers in the 
classpath.

As an example, if someone launches a 2.x spark shell with the spark-csv package 
in the classpath, Spark currently fails in an extremely unfriendly way (see 
databricks/spark-csv#367):

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> val df = spark.read.csv("/foo/bar.csv")
java.lang.RuntimeException: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, 
com.databricks.spark.csv.DefaultSource15), please specify the fully qualified 
class name.
  at scala.sys.package$.error(package.scala:27)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574)
  at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85)
  at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
  ... 48 elided
```

This PR proposes a simple way of fixing this error by picking up the internal 
datasource if there is single (the datasource that has "org.apache.spark" 
prefix).

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

## How was this patch tested?

Manually tested as below:

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
```

```scala
spark.sparkContext.setLogLevel("WARN")
```

**positive cases**:

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv 
(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal 
datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

(newlines were inserted for readability).

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc")
```

```scala
scala> 
spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc")
```

**negative cases**:

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc")
java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation
...
```

```scala
scala> 
spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc")
java.lang.ClassNotFoundException: Failed to find data source: 
com.databricks.spark.csv.CsvRelatio. Please find packages at 
http://spark.apache.org/third-party-projects.html
...
```

Author: hyukjinkwon <[email protected]>

Closes #17916 from HyukjinKwon/datasource-detect.

(cherry picked from commit 3d2131ab4ddead29601fb3c597b798202ac25fdd)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b6f3a11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b6f3a11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b6f3a11

Branch: refs/heads/branch-2.2
Commit: 7b6f3a118e973216264bbf356af2bb1e7870466e
Parents: 6a996b3
Author: hyukjinkwon <[email protected]>
Authored: Wed May 10 13:44:47 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed May 10 13:45:07 2017 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 19 ++++--
 ....apache.spark.sql.sources.DataSourceRegister |  4 ++
 .../spark/sql/sources/DDLSourceLoadSuite.scala  | 44 +++++++++++---
 .../spark/sql/sources/fakeExternalSources.scala | 64 ++++++++++++++++++++
 4 files changed, 117 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b6f3a11/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f3b209d..bb7d1f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -481,7 +481,7 @@ case class DataSource(
   }
 }
 
-object DataSource {
+object DataSource extends Logging {
 
   /** A map to maintain backward compatibility in case we move data sources 
around. */
   private val backwardCompatibilityMap: Map[String, String] = {
@@ -570,10 +570,19 @@ object DataSource {
           // there is exactly one registered alias
           head.getClass
         case sources =>
-          // There are multiple registered aliases for the input
-          sys.error(s"Multiple sources found for $provider1 " +
-            s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-            "please specify the fully qualified class name.")
+          // There are multiple registered aliases for the input. If there is 
single datasource
+          // that has "org.apache.spark" package in the prefix, we use it 
considering it is an
+          // internal datasource within Spark.
+          val sourceNames = sources.map(_.getClass.getName)
+          val internalSources = 
sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
+          if (internalSources.size == 1) {
+            logWarning(s"Multiple sources found for $provider1 
(${sourceNames.mkString(", ")}), " +
+              s"defaulting to the internal datasource 
(${internalSources.head.getClass.getName}).")
+            internalSources.head.getClass
+          } else {
+            throw new AnalysisException(s"Multiple sources found for 
$provider1 " +
+              s"(${sourceNames.mkString(", ")}), please specify the fully 
qualified class name.")
+          }
       }
     } catch {
       case e: ServiceConfigurationError if 
e.getCause.isInstanceOf[NoClassDefFoundError] =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7b6f3a11/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cfd7889..c6973bf 100644
--- 
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,3 +1,7 @@
 org.apache.spark.sql.sources.FakeSourceOne
 org.apache.spark.sql.sources.FakeSourceTwo
 org.apache.spark.sql.sources.FakeSourceThree
+org.apache.spark.sql.sources.FakeSourceFour
+org.apache.fakesource.FakeExternalSourceOne
+org.apache.fakesource.FakeExternalSourceTwo
+org.apache.fakesource.FakeExternalSourceThree

http://git-wip-us.apache.org/repos/asf/spark/blob/7b6f3a11/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index 85ba33e..b5fb740 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -19,26 +19,39 @@ package org.apache.spark.sql.sources
 
 import org.apache.spark.sql.{AnalysisException, SQLContext}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 
 
 // please note that the META-INF/services had to be modified for the test 
directory for this to work
 class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
 
-  test("data sources with the same name") {
-    intercept[RuntimeException] {
+  test("data sources with the same name - internal data sources") {
+    val e = intercept[AnalysisException] {
       spark.read.format("Fluet da Bomb").load()
     }
+    assert(e.getMessage.contains("Multiple sources found for Fluet da Bomb"))
+  }
+
+  test("data sources with the same name - internal data source/external data 
source") {
+    assert(spark.read.format("datasource").load().schema ==
+      StructType(Seq(StructField("longType", LongType, nullable = false))))
+  }
+
+  test("data sources with the same name - external data sources") {
+    val e = intercept[AnalysisException] {
+      spark.read.format("Fake external source").load()
+    }
+    assert(e.getMessage.contains("Multiple sources found for Fake external 
source"))
   }
 
   test("load data source from format alias") {
-    spark.read.format("gathering quorum").load().schema ==
-      StructType(Seq(StructField("stringType", StringType, nullable = false)))
+    assert(spark.read.format("gathering quorum").load().schema ==
+      StructType(Seq(StructField("stringType", StringType, nullable = false))))
   }
 
   test("specify full classname with duplicate formats") {
-    spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
-      .load().schema == StructType(Seq(StructField("stringType", StringType, 
nullable = false)))
+    assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
+      .load().schema == StructType(Seq(StructField("stringType", StringType, 
nullable = false))))
   }
 
   test("should fail to load ORC without Hive Support") {
@@ -63,7 +76,7 @@ class FakeSourceOne extends RelationProvider with 
DataSourceRegister {
     }
 }
 
-class FakeSourceTwo extends RelationProvider  with DataSourceRegister {
+class FakeSourceTwo extends RelationProvider with DataSourceRegister {
 
   def shortName(): String = "Fluet da Bomb"
 
@@ -72,7 +85,7 @@ class FakeSourceTwo extends RelationProvider  with 
DataSourceRegister {
       override def sqlContext: SQLContext = cont
 
       override def schema: StructType =
-        StructType(Seq(StructField("stringType", StringType, nullable = 
false)))
+        StructType(Seq(StructField("integerType", IntegerType, nullable = 
false)))
     }
 }
 
@@ -88,3 +101,16 @@ class FakeSourceThree extends RelationProvider with 
DataSourceRegister {
         StructType(Seq(StructField("stringType", StringType, nullable = 
false)))
     }
 }
+
+class FakeSourceFour extends RelationProvider with DataSourceRegister {
+
+  def shortName(): String = "datasource"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("longType", LongType, nullable = false)))
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7b6f3a11/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
new file mode 100644
index 0000000..0dfd75e
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.fakesource
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.types._
+
+
+// Note that the package name is intendedly mismatched in order to resemble 
external data sources
+// and test the detection for them.
+class FakeExternalSourceOne extends RelationProvider with DataSourceRegister {
+
+  def shortName(): String = "Fake external source"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("stringType", StringType, nullable = 
false)))
+    }
+}
+
+class FakeExternalSourceTwo extends RelationProvider with DataSourceRegister {
+
+  def shortName(): String = "Fake external source"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("integerType", IntegerType, nullable = 
false)))
+    }
+}
+
+class FakeExternalSourceThree extends RelationProvider with DataSourceRegister 
{
+
+  def shortName(): String = "datasource"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("byteType", ByteType, nullable = false)))
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to