Repository: spark
Updated Branches:
  refs/heads/branch-1.6 29f3a2fc8 -> 6c31c20ea


[SPARK-11881][SQL] Fix for postgresql fetchsize > 0

Reference: 
https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
In order for PostgreSQL to honor the fetchSize non-zero setting, its 
Connection.autoCommit needs to be set to false. Otherwise, it will just quietly 
ignore the fetchSize setting.

This adds a new side-effecting dialect specific beforeFetch method that will 
fire before a select query is ran.

Author: mariusvniekerk <[email protected]>

Closes #9861 from mariusvniekerk/SPARK-11881.

(cherry picked from commit b63938a8b04a30feb6b2255c4d4e530a74855afc)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c31c20e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c31c20e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c31c20e

Branch: refs/heads/branch-1.6
Commit: 6c31c20ea941ef9efac87fd55b178b76baab9aa4
Parents: 29f3a2f
Author: mariusvniekerk <[email protected]>
Authored: Thu Nov 26 19:13:16 2015 -0800
Committer: Reynold Xin <[email protected]>
Committed: Thu Nov 26 19:13:36 2015 -0800

----------------------------------------------------------------------
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   | 12 ++++++++++++
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 11 +++++++++++
 .../apache/spark/sql/jdbc/PostgresDialect.scala    | 17 ++++++++++++++++-
 3 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c31c20e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 89c850c..f9b7259 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -224,6 +224,7 @@ private[sql] object JDBCRDD extends Logging {
       quotedColumns,
       filters,
       parts,
+      url,
       properties)
   }
 }
@@ -241,6 +242,7 @@ private[sql] class JDBCRDD(
     columns: Array[String],
     filters: Array[Filter],
     partitions: Array[Partition],
+    url: String,
     properties: Properties)
   extends RDD[InternalRow](sc, Nil) {
 
@@ -361,6 +363,9 @@ private[sql] class JDBCRDD(
     context.addTaskCompletionListener{ context => close() }
     val part = thePart.asInstanceOf[JDBCPartition]
     val conn = getConnection()
+    val dialect = JdbcDialects.get(url)
+    import scala.collection.JavaConverters._
+    dialect.beforeFetch(conn, properties.asScala.toMap)
 
     // H2's JDBC driver does not support the setSchema() method.  We pass a
     // fully-qualified table name in the SELECT statement.  I don't know how to
@@ -489,6 +494,13 @@ private[sql] class JDBCRDD(
       }
       try {
         if (null != conn) {
+          if (!conn.getAutoCommit && !conn.isClosed) {
+            try {
+              conn.commit()
+            } catch {
+              case e: Throwable => logWarning("Exception committing 
transaction", e)
+            }
+          }
           conn.close()
         }
         logInfo("closed connection")

http://git-wip-us.apache.org/repos/asf/spark/blob/6c31c20e/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index b3b2cb6..13db141 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.jdbc
 
+import java.sql.Connection
+
 import org.apache.spark.sql.types._
 import org.apache.spark.annotation.DeveloperApi
 
@@ -97,6 +99,15 @@ abstract class JdbcDialect extends Serializable {
     s"SELECT * FROM $table WHERE 1=0"
   }
 
+  /**
+    * Override connection specific properties to run before a select is made.  
This is in place to
+    * allow dialects that need special treatment to optimize behavior.
+    * @param connection The connection object
+    * @param properties The connection properties.  This is passed through 
from the relation.
+    */
+  def beforeFetch(connection: Connection, properties: Map[String, String]): 
Unit = {
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6c31c20e/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index ed3faa1..3cf80f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.Types
+import java.sql.{Connection, Types}
 
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.types._
@@ -70,4 +70,19 @@ private object PostgresDialect extends JdbcDialect {
   override def getTableExistsQuery(table: String): String = {
     s"SELECT 1 FROM $table LIMIT 1"
   }
+
+  override def beforeFetch(connection: Connection, properties: Map[String, 
String]): Unit = {
+    super.beforeFetch(connection, properties)
+
+    // According to the postgres jdbc documentation we need to be in 
autocommit=false if we actually
+    // want to have fetchsize be non 0 (all the rows).  This allows us to not 
have to cache all the
+    // rows inside the driver when fetching.
+    //
+    // See: 
https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
+    //
+    if (properties.getOrElse("fetchsize", "0").toInt > 0) {
+      connection.setAutoCommit(false)
+    }
+
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to