This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d7128c32d64 [SPARK-44375][SQL] Use PartitionEvaluator API in DebugExec d7128c32d64 is described below commit d7128c32d645cdd795cae4b4120049007f1e50de Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Wed Jul 19 11:52:18 2023 +0800 [SPARK-44375][SQL] Use PartitionEvaluator API in DebugExec ### What changes were proposed in this pull request? Use PartitionEvaluator API in DebugExec ### Why are the changes needed? use new api to avoid lambda-based distributed execution ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test Closes #41949 from Hisoka-X/SPARK-44375_debug_exec. Authored-by: Jia Fan <fanjiaemi...@qq.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/debug/DebugEvaluatorFactory.scala | 58 ++++++++++++++++++++ .../apache/spark/sql/execution/debug/package.scala | 63 +++++++++++----------- 2 files changed, 88 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala new file mode 100644 index 00000000000..f40e998d12f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.debug + +import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.util.LongAccumulator + +class DebugEvaluatorFactory( + tupleCount: LongAccumulator, + numColumns: Int, + columnAccumulator: Array[SetAccumulator[String]], + output: Seq[Attribute]) extends PartitionEvaluatorFactory[InternalRow, InternalRow] { + override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = { + new DebugEvaluator() + } + + class DebugEvaluator extends PartitionEvaluator[InternalRow, InternalRow] { + override def eval( + partitionIndex: Int, + inputs: Iterator[InternalRow]*): Iterator[InternalRow] = { + val input = inputs.head + new Iterator[InternalRow] { + def hasNext: Boolean = input.hasNext + + def next(): InternalRow = { + val currentRow = input.next() + tupleCount.add(1) + var i = 0 + while (i < numColumns) { + val value = currentRow.get(i, output(i).dataType) + if (value != null) { + columnAccumulator(i).add(value.getClass.getName) + } + i += 1 + } + currentRow + } + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 6f796e6ca94..b4bb6aba15d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -206,25 +206,32 @@ package object debug { } } - case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { - def output: Seq[Attribute] = child.output - class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] { - private val _set = Collections.synchronizedSet(new java.util.HashSet[T]()) - override def isZero: Boolean = _set.isEmpty - override def copy(): AccumulatorV2[T, java.util.Set[T]] = { - val newAcc = new SetAccumulator[T]() - newAcc._set.addAll(_set) - newAcc - } - override def reset(): Unit = _set.clear() - override def add(v: T): Unit = _set.add(v) - override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = { - _set.addAll(other.value) - } - override def value: java.util.Set[T] = _set + class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] { + private val _set = Collections.synchronizedSet(new java.util.HashSet[T]()) + + override def isZero: Boolean = _set.isEmpty + + override def copy(): AccumulatorV2[T, java.util.Set[T]] = { + val newAcc = new SetAccumulator[T]() + newAcc._set.addAll(_set) + newAcc + } + + override def reset(): Unit = _set.clear() + + override def add(v: T): Unit = _set.add(v) + + override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = { + _set.addAll(other.value) } + override def value: java.util.Set[T] = _set + } + + case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { + def output: Seq[Attribute] = child.output + /** * A collection of metrics for each column of output. */ @@ -250,23 +257,13 @@ package object debug { } protected override def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitions { iter => - new Iterator[InternalRow] { - def hasNext: Boolean = iter.hasNext - - def next(): InternalRow = { - val currentRow = iter.next() - tupleCount.add(1) - var i = 0 - while (i < numColumns) { - val value = currentRow.get(i, output(i).dataType) - if (value != null) { - columnStats(i).elementTypes.add(value.getClass.getName) - } - i += 1 - } - currentRow - } + val evaluatorFactory = new DebugEvaluatorFactory(tupleCount, numColumns, + columnStats.map(_.elementTypes), output) + if (conf.usePartitionEvaluator) { + child.execute().mapPartitionsWithEvaluator(evaluatorFactory) + } else { + child.execute().mapPartitionsWithIndex { (index, iter) => + evaluatorFactory.createEvaluator().eval(index, iter) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org