This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 44111b32c [GLUTEN-5461] FEAT: ColumnarArrowPythonEvalExec support for 
Velox backend (#5462)
44111b32c is described below

commit 44111b32c10dad32876caadeaab8459f3afceeaf
Author: Yan Ma <[email protected]>
AuthorDate: Fri Apr 26 11:05:56 2024 +0800

    [GLUTEN-5461] FEAT: ColumnarArrowPythonEvalExec support for Velox backend 
(#5462)
    
    This patch addes the ColumnarArrowPythonEvalExec support for Velox backend.
    Typical case using arrow udf w/o additional projection is supported. More 
cases will be supported in following commits
---
 .github/workflows/velox_docker.yml                 |  24 +-
 .../clickhouse/CHSparkPlanExecApi.scala            |   8 +
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   2 +
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |   9 +
 .../python/ColumnarArrowEvalPythonExec.scala       | 340 +++++++++++++++++++++
 .../python/ArrowEvalPythonExecSuite.scala          |  61 ++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |   2 +
 .../gluten/backendsapi/SparkPlanExecApi.scala      |   7 +
 .../extension/columnar/TransformHintRule.scala     |  15 +-
 .../extension/columnar/TransformSingleNode.scala   |  18 +-
 .../execution/python/BasePythonRunnerShim.scala    |  26 ++
 .../execution/python/BasePythonRunnerShim.scala    |  26 ++
 .../execution/python/BasePythonRunnerShim.scala    |  26 ++
 .../execution/python/BasePythonRunnerShim.scala    |  26 ++
 14 files changed, 582 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/velox_docker.yml 
b/.github/workflows/velox_docker.yml
index 20b6a5703..290d3efd3 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -533,7 +533,11 @@ jobs:
           wget https://github.com/apache/spark/archive/refs/tags/v3.2.2.tar.gz 
&& \
           tar --strip-components=1 -xf v3.2.2.tar.gz 
spark-3.2.2/sql/core/src/test/resources/  && \
           mkdir -p shims/spark32/spark_home/ && \
-          mv sql shims/spark32/spark_home/
+          mv sql shims/spark32/spark_home/ && \
+          dnf module -y install python39 && \
+          alternatives --set python3 /usr/bin/python3.9 && \
+          pip3 install pyspark==3.2.2 cython && \
+          pip3 install pandas pyarrow
       - name: Build and run unit test for Spark 3.2.2 (other tests)
         run: |
           cd $GITHUB_WORKSPACE/
@@ -624,7 +628,11 @@ jobs:
           wget https://github.com/apache/spark/archive/refs/tags/v3.3.1.tar.gz 
&& \
           tar --strip-components=1 -xf v3.3.1.tar.gz 
spark-3.3.1/sql/core/src/test/resources/  && \
           mkdir -p shims/spark33/spark_home/ && \
-          mv sql shims/spark33/spark_home/
+          mv sql shims/spark33/spark_home/ && \
+          dnf module -y install python39 && \
+          alternatives --set python3 /usr/bin/python3.9 && \
+          pip3 install pyspark==3.3.1 cython && \
+          pip3 install pandas pyarrow
       - name: Build and Run unit test for Spark 3.3.1 (other tests)
         run: |
           cd $GITHUB_WORKSPACE/ && \
@@ -711,7 +719,11 @@ jobs:
           wget https://github.com/apache/spark/archive/refs/tags/v3.4.2.tar.gz 
&& \
           tar --strip-components=1 -xf v3.4.2.tar.gz 
spark-3.4.2/sql/core/src/test/resources/  && \
           mkdir -p shims/spark34/spark_home/ && \
-          mv sql shims/spark34/spark_home/
+          mv sql shims/spark34/spark_home/ && \
+          dnf module -y install python39 && \
+          alternatives --set python3 /usr/bin/python3.9 && \
+          pip3 install pyspark==3.4.2 cython && \
+          pip3 install pandas pyarrow
       - name: Build and Run unit test for Spark 3.4.2 (other tests)
         run: |
           cd $GITHUB_WORKSPACE/ && \
@@ -798,7 +810,11 @@ jobs:
           wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz 
&& \
           tar --strip-components=1 -xf v3.5.1.tar.gz 
spark-3.5.1/sql/core/src/test/resources/  && \
           mkdir -p shims/spark35/spark_home/ && \
-          mv sql shims/spark35/spark_home/
+          mv sql shims/spark35/spark_home/ && \
+          dnf module -y install python39 && \
+          alternatives --set python3 /usr/bin/python3.9 && \
+          pip3 install pyspark==3.5.1 cython && \
+          pip3 install pandas pyarrow
       - name: Build and Run unit test for Spark 3.5.1 (other tests)
         run: |
           cd $GITHUB_WORKSPACE/ && \
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 7ea12ffe7..ee8b7dd45 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -681,6 +681,14 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
     throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support 
in ch backend.")
   }
 
+  override def createColumnarArrowEvalPythonExec(
+      udfs: Seq[PythonUDF],
+      resultAttrs: Seq[Attribute],
+      child: SparkPlan,
+      evalType: Int): SparkPlan = {
+    throw new GlutenNotSupportException("ColumnarArrowEvalPythonExec is not 
support in ch backend.")
+  }
+
   /**
    * Define whether the join operator is fallback because of the join operator 
is not supported by
    * backend
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9ac7fed97..8b7cec383 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -498,4 +498,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
   }
 
   override def shouldRewriteCollect(): Boolean = true
+
+  override def supportColumnarArrowUdf(): Boolean = true
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index e9bd47bec..132b5d7dd 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -30,6 +30,7 @@ import 
org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode
 import org.apache.gluten.vectorized.{ColumnarBatchSerializer, 
ColumnarBatchSerializeResult}
 
 import org.apache.spark.{ShuffleDependency, SparkException}
+import org.apache.spark.api.python.ColumnarArrowEvalPythonExec
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{GenShuffleWriterParameters, 
GlutenShuffleWriterWrapper}
@@ -518,6 +519,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       staticPartitions)
   }
 
+  override def createColumnarArrowEvalPythonExec(
+      udfs: Seq[PythonUDF],
+      resultAttrs: Seq[Attribute],
+      child: SparkPlan,
+      evalType: Int): SparkPlan = {
+    new ColumnarArrowEvalPythonExec(udfs, resultAttrs, child, evalType)
+  }
+
   /**
    * Generate ColumnarBatchSerializer for ColumnarShuffleExchangeExec.
    *
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
new file mode 100644
index 000000000..f2beef6ca
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.api.python
+
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.utils.Iterators
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.{BasePythonRunnerShim, 
EvalPythonExec, PythonUDFRunner}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.utils.{SparkArrowUtil, SparkSchemaUtil, 
SparkVectorUtil}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.util.Utils
+
+import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.mutable.ArrayBuffer
+
+class ColumnarArrowPythonRunner(
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]],
+    schema: StructType,
+    timeZoneId: String,
+    conf: Map[String, String])
+  extends BasePythonRunnerShim(funcs, evalType, argOffsets) {
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  protected def newReaderIterator(
+      stream: DataInputStream,
+      writerThread: WriterThread,
+      startTime: Long,
+      env: SparkEnv,
+      worker: Socket,
+      pid: scala.Option[scala.Int],
+      releasedOrClosed: AtomicBoolean,
+      context: TaskContext): Iterator[ColumnarBatch] = {
+
+    new ReaderIterator(
+      stream,
+      writerThread,
+      startTime,
+      env,
+      worker,
+      None,
+      releasedOrClosed,
+      context) {
+      private val allocator = ArrowBufferAllocators.contextInstance()
+
+      private var reader: ArrowStreamReader = _
+      private var root: VectorSchemaRoot = _
+      private var schema: StructType = _
+      private var vectors: Array[ColumnVector] = _
+
+      context.addTaskCompletionListener[Unit] {
+        _ =>
+          if (reader != null) {
+            reader.close(false)
+          }
+          if (root != null) {
+            root.close()
+          }
+      }
+
+      private var batchLoaded = true
+
+      override protected def read(): ColumnarBatch = {
+        if (writerThread.exception.isDefined) {
+          throw writerThread.exception.get
+        }
+        try {
+          if (reader != null && batchLoaded) {
+            batchLoaded = reader.loadNextBatch()
+            if (batchLoaded) {
+              val batch = new ColumnarBatch(vectors)
+              batch.setNumRows(root.getRowCount)
+              batch
+            } else {
+              reader.close(false)
+              // Reach end of stream. Call `read()` again to read control data.
+              read()
+            }
+          } else {
+            stream.readInt() match {
+              case SpecialLengths.START_ARROW_STREAM =>
+                reader = new ArrowStreamReader(stream, allocator)
+                root = reader.getVectorSchemaRoot()
+                schema = SparkArrowUtil.fromArrowSchema(root.getSchema())
+                vectors = ArrowWritableColumnVector
+                  .loadColumns(root.getRowCount, root.getFieldVectors)
+                  .toArray[ColumnVector]
+                read()
+              case SpecialLengths.TIMING_DATA =>
+                handleTimingData()
+                read()
+              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+                throw handlePythonException()
+              case SpecialLengths.END_OF_DATA_SECTION =>
+                handleEndOfDataSection()
+                null
+            }
+          }
+        } catch handleException
+      }
+    }
+  }
+
+  override protected def newWriterThread(
+      env: SparkEnv,
+      worker: Socket,
+      inputIterator: Iterator[ColumnarBatch],
+      partitionIndex: Int,
+      context: TaskContext): WriterThread = {
+    new WriterThread(env, worker, inputIterator, partitionIndex, context) {
+      override protected def writeCommand(dataOut: DataOutputStream): Unit = {
+        // Write config for the worker as a number of key -> value pairs of 
strings
+        dataOut.writeInt(conf.size)
+        for ((k, v) <- conf) {
+          PythonRDD.writeUTF(k, dataOut)
+          PythonRDD.writeUTF(v, dataOut)
+        }
+        PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+      }
+
+      override protected def writeIteratorToStream(dataOut: DataOutputStream): 
Unit = {
+        var numRows: Long = 0
+        val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, timeZoneId)
+        val allocator = ArrowBufferAllocators.contextInstance()
+        val root = VectorSchemaRoot.create(arrowSchema, allocator)
+
+        Utils.tryWithSafeFinally {
+          val loader = new VectorLoader(root)
+          val writer = new ArrowStreamWriter(root, null, dataOut)
+          writer.start()
+          while (inputIterator.hasNext) {
+            val nextBatch = inputIterator.next()
+            numRows += nextBatch.numRows
+
+            val cols = (0 until nextBatch.numCols).toList.map(
+              i =>
+                nextBatch
+                  .asInstanceOf[ColumnarBatch]
+                  .column(i)
+                  .asInstanceOf[ArrowWritableColumnVector]
+                  .getValueVector)
+            val nextRecordBatch =
+              SparkVectorUtil.toArrowRecordBatch(nextBatch.numRows, cols)
+            loader.load(nextRecordBatch)
+            writer.writeBatch()
+            if (nextRecordBatch != null) {
+              nextRecordBatch.close()
+            }
+          }
+          // end writes footer to the output stream and doesn't clean any 
resources.
+          // It could throw exception if the output stream is closed, so it 
should be
+          // in the try block.
+          writer.end()
+        } {
+          root.close()
+          // allocator can't close now or the data will loss
+          // allocator.close()
+        }
+      }
+    }
+  }
+}
+
+case class ColumnarArrowEvalPythonExec(
+    udfs: Seq[PythonUDF],
+    resultAttrs: Seq[Attribute],
+    child: SparkPlan,
+    evalType: Int)
+  extends EvalPythonExec
+  with GlutenPlan {
+  override def supportsColumnar: Boolean = true
+  // TODO: add additional projection support by pre-project
+  // FIXME: incorrect metrics updater
+
+  override protected def evaluate(
+      funcs: Seq[ChainedPythonFunctions],
+      argOffsets: Array[Array[Int]],
+      iter: Iterator[InternalRow],
+      schema: StructType,
+      context: TaskContext): Iterator[InternalRow] = {
+    throw new IllegalStateException(
+      "ColumnarArrowEvalPythonExec doesn't support evaluate InternalRow.")
+  }
+
+  private val sessionLocalTimeZone = conf.sessionLocalTimeZone
+  private def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
+    val timeZoneConf = Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> 
conf.sessionLocalTimeZone)
+    val pandasColsByName = Seq(
+      SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key ->
+        conf.pandasGroupedMapAssignColumnsByName.toString)
+    val arrowSafeTypeCheck = Seq(
+      SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION.key ->
+        conf.arrowSafeTypeConversion.toString)
+    Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck: _*)
+  }
+  private val pythonRunnerConf = getPythonRunnerConfMap(conf)
+
+  protected def evaluateColumnar(
+      funcs: Seq[ChainedPythonFunctions],
+      argOffsets: Array[Array[Int]],
+      iter: Iterator[ColumnarBatch],
+      schema: StructType,
+      context: TaskContext): Iterator[ColumnarBatch] = {
+
+    val outputTypes = output.drop(child.output.length).map(_.dataType)
+
+    val columnarBatchIter = new ColumnarArrowPythonRunner(
+      funcs,
+      evalType,
+      argOffsets,
+      schema,
+      sessionLocalTimeZone,
+      pythonRunnerConf).compute(iter, context.partitionId(), context)
+
+    columnarBatchIter.map {
+      batch =>
+        val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
+        assert(
+          outputTypes == actualDataTypes,
+          "Invalid schema from arrow_udf: " +
+            s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
+        batch
+    }
+  }
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+    udf.children match {
+      case Seq(u: PythonUDF) =>
+        val (chained, children) = collectFunctions(u)
+        (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+      case children =>
+        // There should not be any other UDFs, or the children can't be 
evaluated directly.
+        assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+        (ChainedPythonFunctions(Seq(udf.func)), udf.children)
+    }
+  }
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val inputRDD = child.executeColumnar()
+    inputRDD.mapPartitions {
+      iter =>
+        val context = TaskContext.get()
+        val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
+        // flatten all the arguments
+        val allInputs = new ArrayBuffer[Expression]
+        val dataTypes = new ArrayBuffer[DataType]
+        val argOffsets = inputs.map {
+          input =>
+            input.map {
+              e =>
+                if (allInputs.exists(_.semanticEquals(e))) {
+                  allInputs.indexWhere(_.semanticEquals(e))
+                } else {
+                  allInputs += e
+                  dataTypes += e.dataType
+                  allInputs.length - 1
+                }
+            }.toArray
+        }.toArray
+        val schema = StructType(dataTypes.zipWithIndex.map {
+          case (dt, i) =>
+            StructField(s"_$i", dt)
+        }.toSeq)
+        val contextAwareIterator = new ContextAwareIterator(context, iter)
+        val inputCbCache = new ArrayBuffer[ColumnarBatch]()
+        val inputBatchIter = contextAwareIterator.map {
+          inputCb =>
+            
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
+            // 0. cache input for later merge
+            ColumnarBatches.retain(inputCb)
+            inputCbCache += inputCb
+            inputCb
+        }
+
+        val outputColumnarBatchIterator =
+          evaluateColumnar(pyFuncs, argOffsets, inputBatchIter, schema, 
context)
+        val res =
+          outputColumnarBatchIterator.zipWithIndex.map {
+            case (outputCb, batchId) =>
+              val inputCb = inputCbCache(batchId)
+              val joinedVectors = (0 until inputCb.numCols).toArray.map(
+                i => inputCb.column(i)) ++ (0 until 
outputCb.numCols).toArray.map(
+                i => outputCb.column(i))
+              val numRows = inputCb.numRows
+              val batch = new ColumnarBatch(joinedVectors, numRows)
+              val offloaded =
+                
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
+              ColumnarBatches.release(outputCb)
+              offloaded
+          }
+        Iterators
+          .wrap(res)
+          .recycleIterator {
+            inputCbCache.foreach(ColumnarBatches.release(_))
+          }
+          .recyclePayload(_.close())
+          .create()
+    }
+  }
+  override protected def withNewChildInternal(newChild: SparkPlan): 
ColumnarArrowEvalPythonExec =
+    copy(udfs, resultAttrs, newChild)
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
new file mode 100644
index 000000000..2193448b4
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.python
+
+import org.apache.gluten.execution.WholeStageTransformerSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.api.python.ColumnarArrowEvalPythonExec
+import org.apache.spark.sql.IntegratedUDFTestUtils
+
+class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
+
+  import IntegratedUDFTestUtils._
+  import testImplicits.localSeqToDatasetHolder
+  import testImplicits.newProductEncoder
+
+  override protected val resourcePath: String = "/tpch-data-parquet-velox"
+  override protected val fileFormat: String = "parquet"
+  val pyarrowTestUDF = TestScalarPandasUDF(name = "pyarrowUDF")
+
+  override def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.sql.shuffle.partitions", "1")
+      .set("spark.default.parallelism", "1")
+      .set("spark.executor.cores", "1")
+  }
+
+  test("arrow_udf test") {
+    lazy val base =
+      Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 
1), ("3", 0))
+        .toDF("a", "b")
+    lazy val expected = Seq(
+      ("1", "1"),
+      ("1", "1"),
+      ("2", "2"),
+      ("2", "2"),
+      ("3", "3"),
+      ("3", "3"),
+      ("0", "0"),
+      ("3", "3")
+    ).toDF("a", "p_a")
+
+    val df2 = base.select("a").withColumn("p_a", pyarrowTestUDF(base("a")))
+    checkSparkOperatorMatch[ColumnarArrowEvalPythonExec](df2)
+    checkAnswer(df2, expected)
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 5e8b347b3..fa795b84b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -142,4 +142,6 @@ trait BackendSettingsApi {
   def shouldRewriteTypedImperativeAggregate(): Boolean = false
 
   def shouldRewriteCollect(): Boolean = false
+
+  def supportColumnarArrowUdf(): Boolean = false
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 1cffe3cb1..b1ca34676 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -359,6 +359,13 @@ trait SparkPlanExecApi {
       options: Map[String, String],
       staticPartitions: TablePartitionSpec): SparkPlan
 
+  /** Create ColumnarArrowEvalPythonExec, for velox backend */
+  def createColumnarArrowEvalPythonExec(
+      udfs: Seq[PythonUDF],
+      resultAttrs: Seq[Attribute],
+      child: SparkPlan,
+      evalType: Int): SparkPlan
+
   /**
    * Generate extended DataSourceV2 Strategies. Currently only for ClickHouse 
backend.
    *
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
index 1bd0d3b93..df1c421b4 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
@@ -38,7 +38,7 @@ import 
org.apache.spark.sql.execution.datasources.WriteFilesExec
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.execution.python.EvalPythonExec
+import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, 
BatchEvalPythonExec}
 import org.apache.spark.sql.execution.window.{WindowExec, 
WindowGroupLimitExecShim}
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 import org.apache.spark.sql.types.StringType
@@ -502,9 +502,20 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
             plan.generatorOutput,
             plan.child)
           transformer.doValidate().tagOnFallback(plan)
-        case plan: EvalPythonExec =>
+        case plan: BatchEvalPythonExec =>
           val transformer = EvalPythonExecTransformer(plan.udfs, 
plan.resultAttrs, plan.child)
           transformer.doValidate().tagOnFallback(plan)
+        case plan: ArrowEvalPythonExec =>
+          // When backend doesn't support ColumnarArrow or colunmnar arrow 
configuration not
+          // enabled, we will try offloading through EvalPythonExecTransformer
+          if (
+            !BackendsApiManager.getSettings.supportColumnarArrowUdf() ||
+            !GlutenConfig.getConf.enableColumnarArrowUDF
+          ) {
+            // Both CH and Velox will try using backend's built-in functions 
for calculate
+            val transformer = EvalPythonExecTransformer(plan.udfs, 
plan.resultAttrs, plan.child)
+            transformer.doValidate().tagOnFallback(plan)
+          }
         case plan: TakeOrderedAndProjectExec =>
           val (limit, offset) =
             SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
index bc1276d63..b8f99330e 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.execution.datasources.WriteFilesExec
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.execution.python.EvalPythonExec
+import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, 
BatchEvalPythonExec}
 import org.apache.spark.sql.execution.window.{WindowExec, 
WindowGroupLimitExecShim}
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 
@@ -439,10 +439,24 @@ object TransformOthers {
             plan.outer,
             plan.generatorOutput,
             child)
-        case plan: EvalPythonExec =>
+        case plan: BatchEvalPythonExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           val child = plan.child
           EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
+        case plan: ArrowEvalPythonExec =>
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          val child = plan.child
+          // For ArrowEvalPythonExec, CH supports it through 
EvalPythonExecTransformer while
+          // Velox backend uses ColumnarArrowEvalPythonExec.
+          if (!BackendsApiManager.getSettings.supportColumnarArrowUdf()) {
+            EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
+          } else {
+            
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarArrowEvalPythonExec(
+              plan.udfs,
+              plan.resultAttrs,
+              child,
+              plan.evalType)
+          }
         case p if !p.isInstanceOf[GlutenPlan] =>
           logDebug(s"Transformation for ${p.getClass} is currently not 
supported.")
           val children = plan.children
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0ec04defd
--- /dev/null
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]])
+  extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, 
argOffsets) {}
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0ec04defd
--- /dev/null
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]])
+  extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, 
argOffsets) {}
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0ec04defd
--- /dev/null
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]])
+  extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, 
argOffsets) {}
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0685b3792
--- /dev/null
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]])
+  extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, 
argOffsets, None) {}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to