Repository: spark
Updated Branches:
  refs/heads/master 29b1f6b09 -> a3c29fcbb


[SPARK-19726][SQL] Faild to insert null timestamp value to mysql using spark 
jdbc

## What changes were proposed in this pull request?

when creating table like following:
> create table timestamp_test(id int(11), time_stamp timestamp not null default 
> current_timestamp);

The result of Excuting "insert into timestamp_test values (111, null)" is 
different between Spark and JDBC.
```
mysql> select * from timestamp_test;
+------+---------------------+
| id   | time_stamp          |
+------+---------------------+
|  111 | 1970-01-01 00:00:00 | -> spark
|  111 | 2017-06-27 19:32:38 | -> mysql
+------+---------------------+
2 rows in set (0.00 sec)
```
   Because in such case ```StructField.nullable``` is false, so the generated 
codes of ```InvokeLike``` and ```BoundReference``` don't check whether the 
field is null or not. Instead, they directly use 
```CodegenContext.INPUT_ROW.getLong(1)```, however, 
```UnsafeRow.setNullAt(1)``` will put 0 in the underlying memory.

   The PR will ```always``` set ```StructField.nullable```  true after 
obtaining metadata from jdbc connection, Since we can insert null to not null 
timestamp column in MySQL. In this way, spark will propagate null to underlying 
DB engine, and let DB to choose how to process NULL.

## How was this patch tested?

Added tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: YIHAODIAN\wangshuangshuang <[email protected]>
Author: Shuangshuang Wang <[email protected]>

Closes #18445 from shuangshuangwang/SPARK-19726.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3c29fcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3c29fcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3c29fcb

Branch: refs/heads/master
Commit: a3c29fcbbda02c1528b4185bcb880c91077d480c
Parents: 29b1f6b
Author: YIHAODIAN\wangshuangshuang <[email protected]>
Authored: Tue Jul 4 09:44:27 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Tue Jul 4 09:44:27 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala  |  2 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala      | 12 ++++++++++--
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala      |  8 ++++++++
 3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3c29fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 0f53b5c..57e9bc9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -59,7 +59,7 @@ object JDBCRDD extends Logging {
       try {
         val rs = statement.executeQuery()
         try {
-          JdbcUtils.getSchema(rs, dialect)
+          JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
         } finally {
           rs.close()
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/a3c29fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index ca61c2e..55b2539 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -266,10 +266,14 @@ object JdbcUtils extends Logging {
   /**
    * Takes a [[ResultSet]] and returns its Catalyst schema.
    *
+   * @param alwaysNullable If true, all the columns are nullable.
    * @return A [[StructType]] giving the Catalyst schema.
    * @throws SQLException if the schema contains an unsupported type.
    */
-  def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
+  def getSchema(
+      resultSet: ResultSet,
+      dialect: JdbcDialect,
+      alwaysNullable: Boolean = false): StructType = {
     val rsmd = resultSet.getMetaData
     val ncols = rsmd.getColumnCount
     val fields = new Array[StructField](ncols)
@@ -290,7 +294,11 @@ object JdbcUtils extends Logging {
             rsmd.getClass.getName == 
"org.apache.hive.jdbc.HiveResultSetMetaData" => true
         }
       }
-      val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
+      val nullable = if (alwaysNullable) {
+        true
+      } else {
+        rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
+      }
       val metadata = new MetadataBuilder()
         .putString("name", columnName)
         .putLong("scale", fieldScale)

http://git-wip-us.apache.org/repos/asf/spark/blob/a3c29fcb/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index bf1fd16..92f50a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -24,6 +24,7 @@ import 
scala.collection.JavaConverters.propertiesAsScalaMapConverter
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with 
BeforeAndAfter {
         "schema struct<name:string,id:int>"))
     }
   }
+
+  test("SPARK-19726: INSERT null to a NOT NULL column") {
+    val e = intercept[SparkException] {
+      sql("INSERT INTO PEOPLE1 values (null, null)")
+    }.getMessage
+    assert(e.contains("NULL not allowed for column \"NAME\""))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to