Repository: spark
Updated Branches:
refs/heads/master 52e00f706 -> 2ac895be9
[SPARK-23247][SQL] combines Unsafe operations and statistics operations in Scan
Data Source
## What changes were proposed in this pull request?
Currently, we scan the execution plan of the data source, first the unsafe
operation of each row of data, and then re traverse the data for the count of
rows. In terms of performance, this is not necessary. this PR combines the two
operations and makes statistics on the number of rows while performing the
unsafe operation.
Before modified,
```
val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
val numOutputRows = longMetric("numOutputRows")
unsafeRow.map { r =>
numOutputRows += 1
r
}
```
After modified,
val numOutputRows = longMetric("numOutputRows")
rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map( r => {
numOutputRows += 1
proj(r)
})
}
## How was this patch tested?
the existed test cases.
Author: caoxuewen <[email protected]>
Closes #20415 from heary-cao/DataSourceScanExec.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ac895be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ac895be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ac895be
Branch: refs/heads/master
Commit: 2ac895be909de7e58e1051dc2a1bba98a25bf4be
Parents: 52e00f7
Author: caoxuewen <[email protected]>
Authored: Thu Feb 1 12:05:12 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Feb 1 12:05:12 2018 +0800
----------------------------------------------------------------------
.../sql/execution/DataSourceScanExec.scala | 45 ++++++++++----------
1 file changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2ac895be/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7732e2..ba1157d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -90,16 +90,15 @@ case class RowDataSourceScanExec(
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"))
protected override def doExecute(): RDD[InternalRow] = {
- val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
+ val numOutputRows = longMetric("numOutputRows")
+
+ rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
- iter.map(proj)
- }
-
- val numOutputRows = longMetric("numOutputRows")
- unsafeRow.map { r =>
- numOutputRows += 1
- r
+ iter.map( r => {
+ numOutputRows += 1
+ proj(r)
+ })
}
}
@@ -326,22 +325,22 @@ case class FileSourceScanExec(
// 2) the number of columns should be smaller than
spark.sql.codegen.maxFields
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
} else {
- val unsafeRows = {
- val scan = inputRDD
- if (needsUnsafeRowConversion) {
- scan.mapPartitionsWithIndexInternal { (index, iter) =>
- val proj = UnsafeProjection.create(schema)
- proj.initialize(index)
- iter.map(proj)
- }
- } else {
- scan
- }
- }
val numOutputRows = longMetric("numOutputRows")
- unsafeRows.map { r =>
- numOutputRows += 1
- r
+
+ if (needsUnsafeRowConversion) {
+ inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+ val proj = UnsafeProjection.create(schema)
+ proj.initialize(index)
+ iter.map( r => {
+ numOutputRows += 1
+ proj(r)
+ })
+ }
+ } else {
+ inputRDD.map { r =>
+ numOutputRows += 1
+ r
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]