This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 44111b32c [GLUTEN-5461] FEAT: ColumnarArrowPythonEvalExec support for
Velox backend (#5462)
44111b32c is described below
commit 44111b32c10dad32876caadeaab8459f3afceeaf
Author: Yan Ma <[email protected]>
AuthorDate: Fri Apr 26 11:05:56 2024 +0800
[GLUTEN-5461] FEAT: ColumnarArrowPythonEvalExec support for Velox backend
(#5462)
This patch addes the ColumnarArrowPythonEvalExec support for Velox backend.
Typical case using arrow udf w/o additional projection is supported. More
cases will be supported in following commits
---
.github/workflows/velox_docker.yml | 24 +-
.../clickhouse/CHSparkPlanExecApi.scala | 8 +
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 +
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 9 +
.../python/ColumnarArrowEvalPythonExec.scala | 340 +++++++++++++++++++++
.../python/ArrowEvalPythonExecSuite.scala | 61 ++++
.../gluten/backendsapi/BackendSettingsApi.scala | 2 +
.../gluten/backendsapi/SparkPlanExecApi.scala | 7 +
.../extension/columnar/TransformHintRule.scala | 15 +-
.../extension/columnar/TransformSingleNode.scala | 18 +-
.../execution/python/BasePythonRunnerShim.scala | 26 ++
.../execution/python/BasePythonRunnerShim.scala | 26 ++
.../execution/python/BasePythonRunnerShim.scala | 26 ++
.../execution/python/BasePythonRunnerShim.scala | 26 ++
14 files changed, 582 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/velox_docker.yml
b/.github/workflows/velox_docker.yml
index 20b6a5703..290d3efd3 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -533,7 +533,11 @@ jobs:
wget https://github.com/apache/spark/archive/refs/tags/v3.2.2.tar.gz
&& \
tar --strip-components=1 -xf v3.2.2.tar.gz
spark-3.2.2/sql/core/src/test/resources/ && \
mkdir -p shims/spark32/spark_home/ && \
- mv sql shims/spark32/spark_home/
+ mv sql shims/spark32/spark_home/ && \
+ dnf module -y install python39 && \
+ alternatives --set python3 /usr/bin/python3.9 && \
+ pip3 install pyspark==3.2.2 cython && \
+ pip3 install pandas pyarrow
- name: Build and run unit test for Spark 3.2.2 (other tests)
run: |
cd $GITHUB_WORKSPACE/
@@ -624,7 +628,11 @@ jobs:
wget https://github.com/apache/spark/archive/refs/tags/v3.3.1.tar.gz
&& \
tar --strip-components=1 -xf v3.3.1.tar.gz
spark-3.3.1/sql/core/src/test/resources/ && \
mkdir -p shims/spark33/spark_home/ && \
- mv sql shims/spark33/spark_home/
+ mv sql shims/spark33/spark_home/ && \
+ dnf module -y install python39 && \
+ alternatives --set python3 /usr/bin/python3.9 && \
+ pip3 install pyspark==3.3.1 cython && \
+ pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.3.1 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
@@ -711,7 +719,11 @@ jobs:
wget https://github.com/apache/spark/archive/refs/tags/v3.4.2.tar.gz
&& \
tar --strip-components=1 -xf v3.4.2.tar.gz
spark-3.4.2/sql/core/src/test/resources/ && \
mkdir -p shims/spark34/spark_home/ && \
- mv sql shims/spark34/spark_home/
+ mv sql shims/spark34/spark_home/ && \
+ dnf module -y install python39 && \
+ alternatives --set python3 /usr/bin/python3.9 && \
+ pip3 install pyspark==3.4.2 cython && \
+ pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.4.2 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
@@ -798,7 +810,11 @@ jobs:
wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz
&& \
tar --strip-components=1 -xf v3.5.1.tar.gz
spark-3.5.1/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
- mv sql shims/spark35/spark_home/
+ mv sql shims/spark35/spark_home/ && \
+ dnf module -y install python39 && \
+ alternatives --set python3 /usr/bin/python3.9 && \
+ pip3 install pyspark==3.5.1 cython && \
+ pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.1 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 7ea12ffe7..ee8b7dd45 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -681,6 +681,14 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support
in ch backend.")
}
+ override def createColumnarArrowEvalPythonExec(
+ udfs: Seq[PythonUDF],
+ resultAttrs: Seq[Attribute],
+ child: SparkPlan,
+ evalType: Int): SparkPlan = {
+ throw new GlutenNotSupportException("ColumnarArrowEvalPythonExec is not
support in ch backend.")
+ }
+
/**
* Define whether the join operator is fallback because of the join operator
is not supported by
* backend
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9ac7fed97..8b7cec383 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -498,4 +498,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
override def shouldRewriteCollect(): Boolean = true
+
+ override def supportColumnarArrowUdf(): Boolean = true
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index e9bd47bec..132b5d7dd 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -30,6 +30,7 @@ import
org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode
import org.apache.gluten.vectorized.{ColumnarBatchSerializer,
ColumnarBatchSerializeResult}
import org.apache.spark.{ShuffleDependency, SparkException}
+import org.apache.spark.api.python.ColumnarArrowEvalPythonExec
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters,
GlutenShuffleWriterWrapper}
@@ -518,6 +519,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
staticPartitions)
}
+ override def createColumnarArrowEvalPythonExec(
+ udfs: Seq[PythonUDF],
+ resultAttrs: Seq[Attribute],
+ child: SparkPlan,
+ evalType: Int): SparkPlan = {
+ new ColumnarArrowEvalPythonExec(udfs, resultAttrs, child, evalType)
+ }
+
/**
* Generate ColumnarBatchSerializer for ColumnarShuffleExchangeExec.
*
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
new file mode 100644
index 000000000..f2beef6ca
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.api.python
+
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.memory.arrowalloc.ArrowBufferAllocators
+import org.apache.gluten.utils.Iterators
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.{BasePythonRunnerShim,
EvalPythonExec, PythonUDFRunner}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.utils.{SparkArrowUtil, SparkSchemaUtil,
SparkVectorUtil}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.util.Utils
+
+import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.mutable.ArrayBuffer
+
+class ColumnarArrowPythonRunner(
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]],
+ schema: StructType,
+ timeZoneId: String,
+ conf: Map[String, String])
+ extends BasePythonRunnerShim(funcs, evalType, argOffsets) {
+
+ override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
+
+ override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+ require(
+ bufferSize >= 4,
+ "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+ s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+ protected def newReaderIterator(
+ stream: DataInputStream,
+ writerThread: WriterThread,
+ startTime: Long,
+ env: SparkEnv,
+ worker: Socket,
+ pid: scala.Option[scala.Int],
+ releasedOrClosed: AtomicBoolean,
+ context: TaskContext): Iterator[ColumnarBatch] = {
+
+ new ReaderIterator(
+ stream,
+ writerThread,
+ startTime,
+ env,
+ worker,
+ None,
+ releasedOrClosed,
+ context) {
+ private val allocator = ArrowBufferAllocators.contextInstance()
+
+ private var reader: ArrowStreamReader = _
+ private var root: VectorSchemaRoot = _
+ private var schema: StructType = _
+ private var vectors: Array[ColumnVector] = _
+
+ context.addTaskCompletionListener[Unit] {
+ _ =>
+ if (reader != null) {
+ reader.close(false)
+ }
+ if (root != null) {
+ root.close()
+ }
+ }
+
+ private var batchLoaded = true
+
+ override protected def read(): ColumnarBatch = {
+ if (writerThread.exception.isDefined) {
+ throw writerThread.exception.get
+ }
+ try {
+ if (reader != null && batchLoaded) {
+ batchLoaded = reader.loadNextBatch()
+ if (batchLoaded) {
+ val batch = new ColumnarBatch(vectors)
+ batch.setNumRows(root.getRowCount)
+ batch
+ } else {
+ reader.close(false)
+ // Reach end of stream. Call `read()` again to read control data.
+ read()
+ }
+ } else {
+ stream.readInt() match {
+ case SpecialLengths.START_ARROW_STREAM =>
+ reader = new ArrowStreamReader(stream, allocator)
+ root = reader.getVectorSchemaRoot()
+ schema = SparkArrowUtil.fromArrowSchema(root.getSchema())
+ vectors = ArrowWritableColumnVector
+ .loadColumns(root.getRowCount, root.getFieldVectors)
+ .toArray[ColumnVector]
+ read()
+ case SpecialLengths.TIMING_DATA =>
+ handleTimingData()
+ read()
+ case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+ throw handlePythonException()
+ case SpecialLengths.END_OF_DATA_SECTION =>
+ handleEndOfDataSection()
+ null
+ }
+ }
+ } catch handleException
+ }
+ }
+ }
+
+ override protected def newWriterThread(
+ env: SparkEnv,
+ worker: Socket,
+ inputIterator: Iterator[ColumnarBatch],
+ partitionIndex: Int,
+ context: TaskContext): WriterThread = {
+ new WriterThread(env, worker, inputIterator, partitionIndex, context) {
+ override protected def writeCommand(dataOut: DataOutputStream): Unit = {
+ // Write config for the worker as a number of key -> value pairs of
strings
+ dataOut.writeInt(conf.size)
+ for ((k, v) <- conf) {
+ PythonRDD.writeUTF(k, dataOut)
+ PythonRDD.writeUTF(v, dataOut)
+ }
+ PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ }
+
+ override protected def writeIteratorToStream(dataOut: DataOutputStream):
Unit = {
+ var numRows: Long = 0
+ val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, timeZoneId)
+ val allocator = ArrowBufferAllocators.contextInstance()
+ val root = VectorSchemaRoot.create(arrowSchema, allocator)
+
+ Utils.tryWithSafeFinally {
+ val loader = new VectorLoader(root)
+ val writer = new ArrowStreamWriter(root, null, dataOut)
+ writer.start()
+ while (inputIterator.hasNext) {
+ val nextBatch = inputIterator.next()
+ numRows += nextBatch.numRows
+
+ val cols = (0 until nextBatch.numCols).toList.map(
+ i =>
+ nextBatch
+ .asInstanceOf[ColumnarBatch]
+ .column(i)
+ .asInstanceOf[ArrowWritableColumnVector]
+ .getValueVector)
+ val nextRecordBatch =
+ SparkVectorUtil.toArrowRecordBatch(nextBatch.numRows, cols)
+ loader.load(nextRecordBatch)
+ writer.writeBatch()
+ if (nextRecordBatch != null) {
+ nextRecordBatch.close()
+ }
+ }
+ // end writes footer to the output stream and doesn't clean any
resources.
+ // It could throw exception if the output stream is closed, so it
should be
+ // in the try block.
+ writer.end()
+ } {
+ root.close()
+ // allocator can't close now or the data will loss
+ // allocator.close()
+ }
+ }
+ }
+ }
+}
+
+case class ColumnarArrowEvalPythonExec(
+ udfs: Seq[PythonUDF],
+ resultAttrs: Seq[Attribute],
+ child: SparkPlan,
+ evalType: Int)
+ extends EvalPythonExec
+ with GlutenPlan {
+ override def supportsColumnar: Boolean = true
+ // TODO: add additional projection support by pre-project
+ // FIXME: incorrect metrics updater
+
+ override protected def evaluate(
+ funcs: Seq[ChainedPythonFunctions],
+ argOffsets: Array[Array[Int]],
+ iter: Iterator[InternalRow],
+ schema: StructType,
+ context: TaskContext): Iterator[InternalRow] = {
+ throw new IllegalStateException(
+ "ColumnarArrowEvalPythonExec doesn't support evaluate InternalRow.")
+ }
+
+ private val sessionLocalTimeZone = conf.sessionLocalTimeZone
+ private def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
+ val timeZoneConf = Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key ->
conf.sessionLocalTimeZone)
+ val pandasColsByName = Seq(
+ SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key ->
+ conf.pandasGroupedMapAssignColumnsByName.toString)
+ val arrowSafeTypeCheck = Seq(
+ SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION.key ->
+ conf.arrowSafeTypeConversion.toString)
+ Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck: _*)
+ }
+ private val pythonRunnerConf = getPythonRunnerConfMap(conf)
+
+ protected def evaluateColumnar(
+ funcs: Seq[ChainedPythonFunctions],
+ argOffsets: Array[Array[Int]],
+ iter: Iterator[ColumnarBatch],
+ schema: StructType,
+ context: TaskContext): Iterator[ColumnarBatch] = {
+
+ val outputTypes = output.drop(child.output.length).map(_.dataType)
+
+ val columnarBatchIter = new ColumnarArrowPythonRunner(
+ funcs,
+ evalType,
+ argOffsets,
+ schema,
+ sessionLocalTimeZone,
+ pythonRunnerConf).compute(iter, context.partitionId(), context)
+
+ columnarBatchIter.map {
+ batch =>
+ val actualDataTypes = (0 until batch.numCols()).map(i =>
batch.column(i).dataType())
+ assert(
+ outputTypes == actualDataTypes,
+ "Invalid schema from arrow_udf: " +
+ s"expected ${outputTypes.mkString(", ")}, got
${actualDataTypes.mkString(", ")}")
+ batch
+ }
+ }
+
+ private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions,
Seq[Expression]) = {
+ udf.children match {
+ case Seq(u: PythonUDF) =>
+ val (chained, children) = collectFunctions(u)
+ (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+ case children =>
+ // There should not be any other UDFs, or the children can't be
evaluated directly.
+ assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+ (ChainedPythonFunctions(Seq(udf.func)), udf.children)
+ }
+ }
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val inputRDD = child.executeColumnar()
+ inputRDD.mapPartitions {
+ iter =>
+ val context = TaskContext.get()
+ val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
+ // flatten all the arguments
+ val allInputs = new ArrayBuffer[Expression]
+ val dataTypes = new ArrayBuffer[DataType]
+ val argOffsets = inputs.map {
+ input =>
+ input.map {
+ e =>
+ if (allInputs.exists(_.semanticEquals(e))) {
+ allInputs.indexWhere(_.semanticEquals(e))
+ } else {
+ allInputs += e
+ dataTypes += e.dataType
+ allInputs.length - 1
+ }
+ }.toArray
+ }.toArray
+ val schema = StructType(dataTypes.zipWithIndex.map {
+ case (dt, i) =>
+ StructField(s"_$i", dt)
+ }.toSeq)
+ val contextAwareIterator = new ContextAwareIterator(context, iter)
+ val inputCbCache = new ArrayBuffer[ColumnarBatch]()
+ val inputBatchIter = contextAwareIterator.map {
+ inputCb =>
+
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
+ // 0. cache input for later merge
+ ColumnarBatches.retain(inputCb)
+ inputCbCache += inputCb
+ inputCb
+ }
+
+ val outputColumnarBatchIterator =
+ evaluateColumnar(pyFuncs, argOffsets, inputBatchIter, schema,
context)
+ val res =
+ outputColumnarBatchIterator.zipWithIndex.map {
+ case (outputCb, batchId) =>
+ val inputCb = inputCbCache(batchId)
+ val joinedVectors = (0 until inputCb.numCols).toArray.map(
+ i => inputCb.column(i)) ++ (0 until
outputCb.numCols).toArray.map(
+ i => outputCb.column(i))
+ val numRows = inputCb.numRows
+ val batch = new ColumnarBatch(joinedVectors, numRows)
+ val offloaded =
+
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
+ ColumnarBatches.release(outputCb)
+ offloaded
+ }
+ Iterators
+ .wrap(res)
+ .recycleIterator {
+ inputCbCache.foreach(ColumnarBatches.release(_))
+ }
+ .recyclePayload(_.close())
+ .create()
+ }
+ }
+ override protected def withNewChildInternal(newChild: SparkPlan):
ColumnarArrowEvalPythonExec =
+ copy(udfs, resultAttrs, newChild)
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
new file mode 100644
index 000000000..2193448b4
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.python
+
+import org.apache.gluten.execution.WholeStageTransformerSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.api.python.ColumnarArrowEvalPythonExec
+import org.apache.spark.sql.IntegratedUDFTestUtils
+
+class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
+
+ import IntegratedUDFTestUtils._
+ import testImplicits.localSeqToDatasetHolder
+ import testImplicits.newProductEncoder
+
+ override protected val resourcePath: String = "/tpch-data-parquet-velox"
+ override protected val fileFormat: String = "parquet"
+ val pyarrowTestUDF = TestScalarPandasUDF(name = "pyarrowUDF")
+
+ override def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.sql.shuffle.partitions", "1")
+ .set("spark.default.parallelism", "1")
+ .set("spark.executor.cores", "1")
+ }
+
+ test("arrow_udf test") {
+ lazy val base =
+ Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0",
1), ("3", 0))
+ .toDF("a", "b")
+ lazy val expected = Seq(
+ ("1", "1"),
+ ("1", "1"),
+ ("2", "2"),
+ ("2", "2"),
+ ("3", "3"),
+ ("3", "3"),
+ ("0", "0"),
+ ("3", "3")
+ ).toDF("a", "p_a")
+
+ val df2 = base.select("a").withColumn("p_a", pyarrowTestUDF(base("a")))
+ checkSparkOperatorMatch[ColumnarArrowEvalPythonExec](df2)
+ checkAnswer(df2, expected)
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 5e8b347b3..fa795b84b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -142,4 +142,6 @@ trait BackendSettingsApi {
def shouldRewriteTypedImperativeAggregate(): Boolean = false
def shouldRewriteCollect(): Boolean = false
+
+ def supportColumnarArrowUdf(): Boolean = false
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 1cffe3cb1..b1ca34676 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -359,6 +359,13 @@ trait SparkPlanExecApi {
options: Map[String, String],
staticPartitions: TablePartitionSpec): SparkPlan
+ /** Create ColumnarArrowEvalPythonExec, for velox backend */
+ def createColumnarArrowEvalPythonExec(
+ udfs: Seq[PythonUDF],
+ resultAttrs: Seq[Attribute],
+ child: SparkPlan,
+ evalType: Int): SparkPlan
+
/**
* Generate extended DataSourceV2 Strategies. Currently only for ClickHouse
backend.
*
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
index 1bd0d3b93..df1c421b4 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
@@ -38,7 +38,7 @@ import
org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.execution.python.EvalPythonExec
+import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec,
BatchEvalPythonExec}
import org.apache.spark.sql.execution.window.{WindowExec,
WindowGroupLimitExecShim}
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.StringType
@@ -502,9 +502,20 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
plan.generatorOutput,
plan.child)
transformer.doValidate().tagOnFallback(plan)
- case plan: EvalPythonExec =>
+ case plan: BatchEvalPythonExec =>
val transformer = EvalPythonExecTransformer(plan.udfs,
plan.resultAttrs, plan.child)
transformer.doValidate().tagOnFallback(plan)
+ case plan: ArrowEvalPythonExec =>
+ // When backend doesn't support ColumnarArrow or colunmnar arrow
configuration not
+ // enabled, we will try offloading through EvalPythonExecTransformer
+ if (
+ !BackendsApiManager.getSettings.supportColumnarArrowUdf() ||
+ !GlutenConfig.getConf.enableColumnarArrowUDF
+ ) {
+ // Both CH and Velox will try using backend's built-in functions
for calculate
+ val transformer = EvalPythonExecTransformer(plan.udfs,
plan.resultAttrs, plan.child)
+ transformer.doValidate().tagOnFallback(plan)
+ }
case plan: TakeOrderedAndProjectExec =>
val (limit, offset) =
SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
index bc1276d63..b8f99330e 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformSingleNode.scala
@@ -36,7 +36,7 @@ import
org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.execution.python.EvalPythonExec
+import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec,
BatchEvalPythonExec}
import org.apache.spark.sql.execution.window.{WindowExec,
WindowGroupLimitExecShim}
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
@@ -439,10 +439,24 @@ object TransformOthers {
plan.outer,
plan.generatorOutput,
child)
- case plan: EvalPythonExec =>
+ case plan: BatchEvalPythonExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
val child = plan.child
EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
+ case plan: ArrowEvalPythonExec =>
+ logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ val child = plan.child
+ // For ArrowEvalPythonExec, CH supports it through
EvalPythonExecTransformer while
+ // Velox backend uses ColumnarArrowEvalPythonExec.
+ if (!BackendsApiManager.getSettings.supportColumnarArrowUdf()) {
+ EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
+ } else {
+
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarArrowEvalPythonExec(
+ plan.udfs,
+ plan.resultAttrs,
+ child,
+ plan.evalType)
+ }
case p if !p.isInstanceOf[GlutenPlan] =>
logDebug(s"Transformation for ${p.getClass} is currently not
supported.")
val children = plan.children
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0ec04defd
--- /dev/null
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]])
+ extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType,
argOffsets) {}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0ec04defd
--- /dev/null
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]])
+ extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType,
argOffsets) {}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0ec04defd
--- /dev/null
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]])
+ extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType,
argOffsets) {}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
new file mode 100644
index 000000000..0685b3792
--- /dev/null
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class BasePythonRunnerShim(
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]])
+ extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType,
argOffsets, None) {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]