This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d7128c32d64 [SPARK-44375][SQL] Use PartitionEvaluator API in DebugExec
d7128c32d64 is described below

commit d7128c32d645cdd795cae4b4120049007f1e50de
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Wed Jul 19 11:52:18 2023 +0800

    [SPARK-44375][SQL] Use PartitionEvaluator API in DebugExec
    
    ### What changes were proposed in this pull request?
    Use PartitionEvaluator API in DebugExec
    
    ### Why are the changes needed?
    use new api to avoid lambda-based distributed execution
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    add new test
    
    Closes #41949 from Hisoka-X/SPARK-44375_debug_exec.
    
    Authored-by: Jia Fan <fanjiaemi...@qq.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/debug/DebugEvaluatorFactory.scala    | 58 ++++++++++++++++++++
 .../apache/spark/sql/execution/debug/package.scala | 63 +++++++++++-----------
 2 files changed, 88 insertions(+), 33 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala
new file mode 100644
index 00000000000..f40e998d12f
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/DebugEvaluatorFactory.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.debug
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.util.LongAccumulator
+
+class DebugEvaluatorFactory(
+    tupleCount: LongAccumulator,
+    numColumns: Int,
+    columnAccumulator: Array[SetAccumulator[String]],
+    output: Seq[Attribute]) extends PartitionEvaluatorFactory[InternalRow, 
InternalRow] {
+  override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] 
= {
+    new DebugEvaluator()
+  }
+
+  class DebugEvaluator extends PartitionEvaluator[InternalRow, InternalRow] {
+    override def eval(
+        partitionIndex: Int,
+        inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
+      val input = inputs.head
+      new Iterator[InternalRow] {
+        def hasNext: Boolean = input.hasNext
+
+        def next(): InternalRow = {
+          val currentRow = input.next()
+          tupleCount.add(1)
+          var i = 0
+          while (i < numColumns) {
+            val value = currentRow.get(i, output(i).dataType)
+            if (value != null) {
+              columnAccumulator(i).add(value.getClass.getName)
+            }
+            i += 1
+          }
+          currentRow
+        }
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 6f796e6ca94..b4bb6aba15d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -206,25 +206,32 @@ package object debug {
     }
   }
 
-  case class DebugExec(child: SparkPlan) extends UnaryExecNode with 
CodegenSupport {
-    def output: Seq[Attribute] = child.output
 
-    class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {
-      private val _set = Collections.synchronizedSet(new 
java.util.HashSet[T]())
-      override def isZero: Boolean = _set.isEmpty
-      override def copy(): AccumulatorV2[T, java.util.Set[T]] = {
-        val newAcc = new SetAccumulator[T]()
-        newAcc._set.addAll(_set)
-        newAcc
-      }
-      override def reset(): Unit = _set.clear()
-      override def add(v: T): Unit = _set.add(v)
-      override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
-        _set.addAll(other.value)
-      }
-      override def value: java.util.Set[T] = _set
+  class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {
+    private val _set = Collections.synchronizedSet(new java.util.HashSet[T]())
+
+    override def isZero: Boolean = _set.isEmpty
+
+    override def copy(): AccumulatorV2[T, java.util.Set[T]] = {
+      val newAcc = new SetAccumulator[T]()
+      newAcc._set.addAll(_set)
+      newAcc
+    }
+
+    override def reset(): Unit = _set.clear()
+
+    override def add(v: T): Unit = _set.add(v)
+
+    override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
+      _set.addAll(other.value)
     }
 
+    override def value: java.util.Set[T] = _set
+  }
+
+  case class DebugExec(child: SparkPlan) extends UnaryExecNode with 
CodegenSupport {
+    def output: Seq[Attribute] = child.output
+
     /**
      * A collection of metrics for each column of output.
      */
@@ -250,23 +257,13 @@ package object debug {
     }
 
     protected override def doExecute(): RDD[InternalRow] = {
-      child.execute().mapPartitions { iter =>
-        new Iterator[InternalRow] {
-          def hasNext: Boolean = iter.hasNext
-
-          def next(): InternalRow = {
-            val currentRow = iter.next()
-            tupleCount.add(1)
-            var i = 0
-            while (i < numColumns) {
-              val value = currentRow.get(i, output(i).dataType)
-              if (value != null) {
-                columnStats(i).elementTypes.add(value.getClass.getName)
-              }
-              i += 1
-            }
-            currentRow
-          }
+      val evaluatorFactory = new DebugEvaluatorFactory(tupleCount, numColumns,
+        columnStats.map(_.elementTypes), output)
+      if (conf.usePartitionEvaluator) {
+        child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
+      } else {
+        child.execute().mapPartitionsWithIndex { (index, iter) =>
+          evaluatorFactory.createEvaluator().eval(index, iter)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to