Repository: spark Updated Branches: refs/heads/branch-1.6 33cd171b2 -> a589736a1
[SPARK-11989][SQL] Only use commit in JDBC data source if the underlying database supports transactions Fixes [SPARK-11989](https://issues.apache.org/jira/browse/SPARK-11989) Author: CK50 <[email protected]> Author: Christian Kurz <[email protected]> Closes #9973 from CK50/branch-1.6_non-transactional. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a589736a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a589736a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a589736a Branch: refs/heads/branch-1.6 Commit: a589736a1b237ef2f3bd59fbaeefe143ddcc8f4e Parents: 33cd171 Author: CK50 <[email protected]> Authored: Mon Nov 30 20:08:49 2015 +0800 Committer: Reynold Xin <[email protected]> Committed: Mon Nov 30 20:08:49 2015 +0800 ---------------------------------------------------------------------- .../execution/datasources/jdbc/JdbcUtils.scala | 22 +++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a589736a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7375a5c..252f1cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -21,6 +21,7 @@ import java.sql.{Connection, PreparedStatement} import java.util.Properties import scala.util.Try +import scala.util.control.NonFatal import org.apache.spark.Logging import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType, JdbcDialects} @@ -125,8 +126,19 @@ object JdbcUtils extends Logging { dialect: JdbcDialect): Iterator[Byte] = { val conn = getConnection() var committed = false + val supportsTransactions = try { + conn.getMetaData().supportsDataManipulationTransactionsOnly() || + conn.getMetaData().supportsDataDefinitionAndDataManipulationTransactions() + } catch { + case NonFatal(e) => + logWarning("Exception while detecting transaction support", e) + true + } + try { - conn.setAutoCommit(false) // Everything in the same db transaction. + if (supportsTransactions) { + conn.setAutoCommit(false) // Everything in the same db transaction. + } val stmt = insertStatement(conn, table, rddSchema) try { var rowCount = 0 @@ -175,14 +187,18 @@ object JdbcUtils extends Logging { } finally { stmt.close() } - conn.commit() + if (supportsTransactions) { + conn.commit() + } committed = true } finally { if (!committed) { // The stage must fail. We got here through an exception path, so // let the exception through unless rollback() or close() want to // tell the user about another problem. - conn.rollback() + if (supportsTransactions) { + conn.rollback() + } conn.close() } else { // The stage must succeed. We cannot propagate any exception close() might throw. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
