Repository: spark
Updated Branches:
  refs/heads/master 865e7cc38 -> ebdd75127


[SPARK-13498][SQL] Increment the recordsRead input metric for JDBC data source

## What changes were proposed in this pull request?
This patch brings https://github.com/apache/spark/pull/11373 up-to-date and 
increments the record count for JDBC data source.

Closes #11373.

## How was this patch tested?
N/A

Author: Reynold Xin <r...@databricks.com>

Closes #13694 from rxin/SPARK-13498.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebdd7512
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebdd7512
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebdd7512

Branch: refs/heads/master
Commit: ebdd7512723851934241bd87fe7b25fd60cc58d8
Parents: 865e7cc
Author: Wayne Song <ws...@memsql.com>
Authored: Wed Jun 15 20:09:47 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jun 15 20:09:47 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala  | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ebdd7512/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 6a5564a..8d0906e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -374,6 +374,7 @@ private[sql] class JDBCRDD(
     var nextValue: InternalRow = null
 
     context.addTaskCompletionListener{ context => close() }
+    val inputMetrics = context.taskMetrics().inputMetrics
     val part = thePart.asInstanceOf[JDBCPartition]
     val conn = getConnection()
     val dialect = JdbcDialects.get(url)
@@ -398,6 +399,7 @@ private[sql] class JDBCRDD(
 
     def getNext(): InternalRow = {
       if (rs.next()) {
+        inputMetrics.incRecordsRead(1)
         var i = 0
         while (i < conversions.length) {
           val pos = i + 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to