Repository: spark
Updated Branches:
  refs/heads/master 0d589ba00 -> 4816c2ef5


[SPARK-15648][SQL] Add teradataDialect for JDBC connection to Teradata

The contribution is my original work and I license the work to the project 
under the project’s open source license.

Note: the Teradata JDBC connector limits the row size to 64K. The default 
string datatype equivalent I used is a 255 character/byte length varchar. This 
effectively limits the max number of string columns to 250 when using the 
Teradata jdbc connector.

## What changes were proposed in this pull request?

Added a teradataDialect for JDBC connection to Teradata. The Teradata dialect 
uses VARCHAR(255) in place of TEXT for string datatypes, and CHAR(1) in place 
of BIT(1) for boolean datatypes.

## How was this patch tested?

I added two unit tests to double check that the types get set correctly for a 
teradata jdbc url. I also ran a couple manual tests to make sure the jdbc 
connector worked with teradata and to make sure that an error was thrown if a 
row could potentially exceed 64K (this error comes from the teradata jdbc 
connector, not from the spark code). I did not check how string columns longer 
than 255 characters are handled.

Author: Kirby Linvill <[email protected]>
Author: klinvill <[email protected]>

Closes #16746 from klinvill/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4816c2ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4816c2ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4816c2ef

Branch: refs/heads/master
Commit: 4816c2ef5e04eb2dd70bed8b99882aa0b7fe7fd7
Parents: 0d589ba
Author: Kirby Linvill <[email protected]>
Authored: Tue May 23 12:00:58 2017 -0700
Committer: Xiao Li <[email protected]>
Committed: Tue May 23 12:00:58 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/jdbc/JdbcDialects.scala    |  1 +
 .../apache/spark/sql/jdbc/TeradataDialect.scala | 34 ++++++++++++++++++++
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 12 +++++++
 3 files changed, 47 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4816c2ef/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index e328b86..a86a86d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -174,6 +174,7 @@ object JdbcDialects {
   registerDialect(MsSqlServerDialect)
   registerDialect(DerbyDialect)
   registerDialect(OracleDialect)
+  registerDialect(TeradataDialect)
 
   /**
    * Fetch the JdbcDialect class corresponding to a given database url.

http://git-wip-us.apache.org/repos/asf/spark/blob/4816c2ef/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
new file mode 100644
index 0000000..5749b79
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.types._
+
+
+private case object TeradataDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = { 
url.startsWith("jdbc:teradata") }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR))
+    case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+    case _ => None
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4816c2ef/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d9f3689..70bee92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -922,6 +922,18 @@ class JDBCSuite extends SparkFunSuite
     assert(e2.contains("User specified schema not supported with `jdbc`"))
   }
 
+  test("SPARK-15648: teradataDialect StringType data mapping") {
+    val teradataDialect = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")
+    assert(teradataDialect.getJDBCType(StringType).
+      map(_.databaseTypeDefinition).get == "VARCHAR(255)")
+  }
+
+  test("SPARK-15648: teradataDialect BooleanType data mapping") {
+    val teradataDialect = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")
+    assert(teradataDialect.getJDBCType(BooleanType).
+      map(_.databaseTypeDefinition).get == "CHAR(1)")
+  }
+
   test("Checking metrics correctness with JDBC") {
     val foobarCnt = spark.table("foobar").count()
     val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to