Repository: spark Updated Branches: refs/heads/master 73d80ec49 -> 02218c4c7
[SPARK-22251][SQL] Metric 'aggregate time' is incorrect when codegen is off ## What changes were proposed in this pull request? Adding the code for setting 'aggregate time' metric to non-codegen path in HashAggregateExec and to ObjectHashAggregateExces. ## How was this patch tested? Tested manually. Author: Ala Luszczak <[email protected]> Closes #19473 from ala/fix-agg-time. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02218c4c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02218c4c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02218c4c Branch: refs/heads/master Commit: 02218c4c73c32741390d9906b6190ef2124ce518 Parents: 73d80ec Author: Ala Luszczak <[email protected]> Authored: Thu Oct 12 17:00:22 2017 +0200 Committer: Herman van Hovell <[email protected]> Committed: Thu Oct 12 17:00:22 2017 +0200 ---------------------------------------------------------------------- .../spark/sql/execution/aggregate/HashAggregateExec.scala | 6 +++++- .../sql/execution/aggregate/ObjectHashAggregateExec.scala | 9 +++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/02218c4c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 8b573fd..43e5ff8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -95,11 +95,13 @@ case class HashAggregateExec( val peakMemory = longMetric("peakMemory") val spillSize = longMetric("spillSize") val avgHashProbe = longMetric("avgHashProbe") + val aggTime = longMetric("aggTime") child.execute().mapPartitionsWithIndex { (partIndex, iter) => + val beforeAgg = System.nanoTime() val hasInput = iter.hasNext - if (!hasInput && groupingExpressions.nonEmpty) { + val res = if (!hasInput && groupingExpressions.nonEmpty) { // This is a grouped aggregate and the input iterator is empty, // so return an empty iterator. Iterator.empty @@ -128,6 +130,8 @@ case class HashAggregateExec( aggregationIterator } } + aggTime += (System.nanoTime() - beforeAgg) / 1000000 + res } } http://git-wip-us.apache.org/repos/asf/spark/blob/02218c4c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 6316e06..ec3f9a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -76,7 +76,8 @@ case class ObjectHashAggregateExec( aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows") + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time") ) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) @@ -96,11 +97,13 @@ case class ObjectHashAggregateExec( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numOutputRows = longMetric("numOutputRows") + val aggTime = longMetric("aggTime") val fallbackCountThreshold = sqlContext.conf.objectAggSortBasedFallbackThreshold child.execute().mapPartitionsWithIndexInternal { (partIndex, iter) => + val beforeAgg = System.nanoTime() val hasInput = iter.hasNext - if (!hasInput && groupingExpressions.nonEmpty) { + val res = if (!hasInput && groupingExpressions.nonEmpty) { // This is a grouped aggregate and the input kvIterator is empty, // so return an empty kvIterator. Iterator.empty @@ -127,6 +130,8 @@ case class ObjectHashAggregateExec( aggregationIterator } } + aggTime += (System.nanoTime() - beforeAgg) / 1000000 + res } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
