Repository: spark
Updated Branches:
refs/heads/branch-2.1 65e896a6e -> 415730e19
[SPARK-18419][SQL] `JDBCRelation.insert` should not remove Spark options
## What changes were proposed in this pull request?
Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly
using `asConnectionProperties`. Spark options like `numPartitions` should be
passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden**
because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case
options. This PR aims to fix both.
**JDBCRelation.insert**
```scala
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val url = jdbcOptions.url
val table = jdbcOptions.table
- val properties = jdbcOptions.asConnectionProperties
+ val properties = jdbcOptions.asProperties
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(url, table, properties)
```
**JDBCOptions.asConnectionProperties**
```scala
scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp",
"dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties
res0: java.util.Properties = {numpartitions=10}
scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" ->
"jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" ->
"10"))).asConnectionProperties
res1: java.util.Properties = {numpartitions=10}
```
## How was this patch tested?
Pass the Jenkins with a new testcase.
Author: Dongjoon Hyun <[email protected]>
Closes #15863 from dongjoon-hyun/SPARK-18419.
(cherry picked from commit 55d528f2ba0ba689dbb881616d9436dc7958e943)
Signed-off-by: Wenchen Fan <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/415730e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/415730e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/415730e1
Branch: refs/heads/branch-2.1
Commit: 415730e19cea3a0e7ea5491bf801a22859bbab66
Parents: 65e896a
Author: Dongjoon Hyun <[email protected]>
Authored: Fri Dec 2 21:48:22 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Dec 2 21:48:50 2016 +0800
----------------------------------------------------------------------
.../datasources/jdbc/JDBCOptions.scala | 23 +++++++++++++++-----
.../execution/datasources/jdbc/JDBCRDD.scala | 1 -
.../datasources/jdbc/JDBCRelation.scala | 2 +-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 10 +++++++++
4 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/415730e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 7f419b5..d94fa7e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager}
import java.util.Properties
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
/**
@@ -41,10 +39,23 @@ class JDBCOptions(
JDBCOptions.JDBC_TABLE_NAME -> table)))
}
+ /**
+ * Returns a property with all options.
+ */
+ val asProperties: Properties = {
+ val properties = new Properties()
+ parameters.foreach { case (k, v) => properties.setProperty(k, v) }
+ properties
+ }
+
+ /**
+ * Returns a property with all options except Spark internal data source
options like `url`,
+ * `dbtable`, and `numPartition`. This should be used when invoking JDBC API
like `Driver.connect`
+ * because each DBMS vendor has its own property list for JDBC driver. See
SPARK-17776.
+ */
val asConnectionProperties: Properties = {
val properties = new Properties()
- // We should avoid to pass the options into properties. See SPARK-17776.
- parameters.filterKeys(!jdbcOptionNames.contains(_))
+ parameters.filterKeys(key => !jdbcOptionNames(key.toLowerCase))
.foreach { case (k, v) => properties.setProperty(k, v) }
properties
}
@@ -125,10 +136,10 @@ class JDBCOptions(
}
object JDBCOptions {
- private val jdbcOptionNames = ArrayBuffer.empty[String]
+ private val jdbcOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
- jdbcOptionNames += name
+ jdbcOptionNames += name.toLowerCase
name
}
http://git-wip-us.apache.org/repos/asf/spark/blob/415730e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 37df283..d5b11e7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -54,7 +54,6 @@ object JDBCRDD extends Logging {
def resolveTable(options: JDBCOptions): StructType = {
val url = options.url
val table = options.table
- val properties = options.asConnectionProperties
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/415730e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 5ca1c75..8b45dba 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -131,7 +131,7 @@ private[sql] case class JDBCRelation(
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val url = jdbcOptions.url
val table = jdbcOptions.table
- val properties = jdbcOptions.asConnectionProperties
+ val properties = jdbcOptions.asProperties
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(url, table, properties)
http://git-wip-us.apache.org/repos/asf/spark/blob/415730e1/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index aa1ab14..4c964bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -890,4 +891,13 @@ class JDBCSuite extends SparkFunSuite
assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1 OR Name =
'mary'").collect().size == 2)
assert(sql("SELECT * FROM mixedCaseCols WHERE Name = 'mary' AND Id =
2").collect().size == 1)
}
+
+ test("SPARK-18419: Fix `asConnectionProperties` to filter
case-insensitively") {
+ val parameters = Map(
+ "url" -> "jdbc:mysql://localhost:3306/temp",
+ "dbtable" -> "t1",
+ "numPartitions" -> "10")
+ assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
+ assert(new JDBCOptions(new
CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]