This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 802e284f0a [GLUTEN-10472][VL] feat: Support columnar partial generate
for HiveUDTF (#10475)
802e284f0a is described below
commit 802e284f0ad88499e5bc206a4cf7a47e9f727cdc
Author: jiangjiangtian <[email protected]>
AuthorDate: Mon Sep 15 04:11:35 2025 +0800
[GLUTEN-10472][VL] feat: Support columnar partial generate for HiveUDTF
(#10475)
---
.../VeloxColumnarBatchJniWrapper.java | 3 +
.../gluten/columnarbatch/VeloxColumnarBatches.java | 18 +
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +
.../execution/ColumnarPartialGenerateExec.scala | 402 +++++++++++++++++++++
.../gluten/extension/PartialGenerateRule.scala | 63 ++++
.../apache/gluten/udtf/ConditionalOutputUDTF.java | 69 ++++
.../java/org/apache/gluten/udtf/CustomerUDTF.java | 82 +++++
.../java/org/apache/gluten/udtf/NoInputUDTF.java | 59 +++
.../java/org/apache/gluten/udtf/SimpleUDTF.java | 65 ++++
.../spark/sql/execution/GlutenHiveUDFSuite.scala | 78 +++-
cpp/velox/jni/VeloxJniWrapper.cc | 60 +++
docs/Configuration.md | 29 +-
.../vectorized/ArrowWritableColumnVector.java | 199 +++++++++-
.../expression/InterpretedArrowGenerate.scala | 60 +++
.../gluten/vectorized/ArrowColumnarRow.scala | 10 +
.../org/apache/gluten/config/GlutenConfig.scala | 8 +
16 files changed, 1171 insertions(+), 36 deletions(-)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
index 2307cf2a57..fb2a5625c6 100644
---
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
+++
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
@@ -36,6 +36,9 @@ public class VeloxColumnarBatchJniWrapper implements
RuntimeAware {
public native long slice(long veloxBatchHandle, int offset, int limit);
+ public native long repeatedThenCompose(
+ long repeatedBatch, long nonRepeatedBatch, int[] rowId2RowNums);
+
@Override
public long rtHandle() {
return runtime.getHandle();
diff --git
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
index 71f40f3424..41c10324dc 100644
---
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
+++
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
@@ -141,4 +141,22 @@ public final class VeloxColumnarBatches {
return ColumnarBatches.create(handle);
}
}
+
+ /**
+ * repeat batch1 using the array `rowId2RowNums` passed in and then compose
with batch2.
+ * rowId2RowNums records the number of each row after repeated.
+ */
+ public static ColumnarBatch repeatedThenCompose(
+ ColumnarBatch batch1, ColumnarBatch batch2, int[] rowId2RowNums) {
+ final Runtime runtime =
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName(),
"VeloxColumnarBatches#repeatedThenCompose");
+ final long handle =
+ VeloxColumnarBatchJniWrapper.create(runtime)
+ .repeatedThenCompose(
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), batch1),
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), batch2),
+ rowId2RowNums);
+ return ColumnarBatches.create(handle);
+ }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 9ef657c5d5..45abbd5a55 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -107,6 +107,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ =>
AppendBatchResizeForShuffleInputAndOutput())
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
+ injector.injectPostTransform(_ => PartialGenerateRule())
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
@@ -207,6 +208,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => RemoveTransitions)
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
+ injector.injectPostTransform(_ => PartialGenerateRule())
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialGenerateExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialGenerateExec.scala
new file mode 100644
index 0000000000..ea1be35995
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialGenerateExec.scala
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.expression.InterpretedArrowGenerate
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.vectorized.{ArrowColumnarRow,
ArrowWritableColumnVector}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, GenericInternalRow, Nondeterministic,
SpecializedGetters}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{ExplainUtils, GenerateExec, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType,
StringType, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+
+/**
+ * By rule <PartialGenerateRule>, if the generator is a instance of
<HiveGenericUDTF>, then the
+ * generateExec will be changed to ColumnarPartialGenerateExec
+ *
+ * @param generateExec
+ * the GenerateExec from vanilla
+ * @param child
+ * child plan
+ */
+case class ColumnarPartialGenerateExec(generateExec: GenerateExec, child:
SparkPlan)
+ extends UnaryExecNode
+ with ValidatablePlan {
+
+ private val generatorNullRow = new
GenericInternalRow(generateExec.generatorOutput.length)
+
+ private val pruneChildAttributes: ListBuffer[Attribute] = ListBuffer()
+ private val pruneChildColumnIndices: ListBuffer[Int] = ListBuffer()
+ private val generatorUsedAttributes: ListBuffer[Attribute] = ListBuffer()
+ private val generatorUsedColumnIndices: ListBuffer[Int] = ListBuffer()
+
+ private var attrNotExists = false
+ private var hasUnsupportedDataType = false
+
+ private val rightSchema =
+
SparkShimLoader.getSparkShims.structFromAttributes(generateExec.generatorOutput)
+
+ getColumnIndexInChildOutput(
+ pruneChildAttributes,
+ pruneChildColumnIndices,
+ generateExec.requiredChildOutput)
+ getColumnIndexInChildOutput(
+ generatorUsedAttributes,
+ generatorUsedColumnIndices,
+ Seq(generateExec.generator))
+
+ private lazy val generator = InterpretedArrowGenerate.create(
+ bindReferences(Seq(generateExec.generator),
generatorUsedAttributes.toSeq).head)
+
+ @transient override lazy val metrics = Map(
+ "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of
partial generate"),
+ "velox_to_arrow_time" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "time of velox to Arrow ColumnarBatch"),
+ "arrow_to_velox_time" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "time of Arrow ColumnarBatch to velox")
+ )
+
+ private def getColumnIndexInChildOutput(
+ attributes: ListBuffer[Attribute],
+ indices: ListBuffer[Int],
+ exprs: Seq[Expression]): Unit = {
+ exprs.foreach {
+ case a: AttributeReference =>
+ val index = child.output.indexWhere(s => s.exprId.equals(a.exprId))
+
+ if (index < 0) {
+ attrNotExists = true
+ log.debug(s"Couldn't find $a in ${child.output.attrs.mkString("[",
",", "]")}")
+ } else if (
+
BackendsApiManager.getValidatorApiInstance.doSchemaValidate(a.dataType).isDefined
+ ) {
+ log.debug(s"Expression $a contains unsupported data type
${a.dataType}")
+ hasUnsupportedDataType = true
+ } else if (!indices.contains(index)) {
+ attributes.append(a)
+ indices.append(index)
+ }
+ case p =>
+ getColumnIndexInChildOutput(attributes, indices, p.children)
+ }
+ }
+
+ override def outputPartitioning(): Partitioning = child.outputPartitioning
+
+ override protected def doCanonicalize(): ColumnarPartialGenerateExec = {
+ val canonicalized = generateExec.canonicalized.asInstanceOf[GenerateExec]
+ this.copy(canonicalized, child.canonicalized)
+ }
+
+ override protected def doValidateInternal(): ValidationResult = {
+ if (attrNotExists) {
+ return ValidationResult.failed("Attribute in the generator does not
exists in its child")
+ }
+ if (hasUnsupportedDataType) {
+ return ValidationResult.failed("Attribute in the generator contains
unsupported type")
+ }
+ ValidationResult.succeeded
+ }
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val totalTime = longMetric("time")
+ val v2a = longMetric("velox_to_arrow_time")
+ val a2v = longMetric("arrow_to_velox_time")
+ child.executeColumnar().mapPartitionsWithIndex {
+ (index, batches) =>
+ generator.generator.foreach {
+ case n: Nondeterministic => n.initialize(index)
+ case _ =>
+ }
+ val res: Iterator[Iterator[ColumnarBatch]] = new
Iterator[Iterator[ColumnarBatch]] {
+ override def hasNext: Boolean = batches.hasNext
+
+ override def next(): Iterator[ColumnarBatch] = {
+ val batch = batches.next()
+ if (batch.numRows() == 0) {
+ Iterator.empty
+ } else {
+ val start = System.currentTimeMillis()
+ val pruneChildInputData = ColumnarBatches
+ .select(BackendsApiManager.getBackendName, batch,
pruneChildColumnIndices.toArray)
+ val generatorUsedInputData = ColumnarBatches
+ .select(
+ BackendsApiManager.getBackendName,
+ batch,
+ generatorUsedColumnIndices.toArray)
+ try {
+ val generatedBatch =
+ getGeneratedResultVeloxArrow(
+ pruneChildInputData,
+ generatorUsedInputData,
+ batches.hasNext,
+ v2a,
+ a2v)
+
+ totalTime += System.currentTimeMillis() - start
+ generatedBatch
+ } finally {
+ pruneChildInputData.close()
+ generatorUsedInputData.close()
+ }
+ }
+ }
+ }
+ Iterators
+ .wrap(res.flatten)
+ .protectInvocationFlow()
+ .recyclePayload(_.close())
+ .create()
+ }
+ }
+
+ private def loadArrowBatch(inputData: ColumnarBatch): ColumnarBatch = {
+ if (inputData.numCols() == 0) {
+ inputData
+ } else {
+ ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), inputData)
+ }
+ }
+
+ private def isVariableWidthType(dt: DataType): Boolean = dt match {
+ case BinaryType => true
+ case StringType => true
+ case StructType(fields) => fields.exists(field =>
isVariableWidthType(field.dataType))
+ case ArrayType(elementType, _) => isVariableWidthType(elementType)
+ case MapType(keyType, valueType, _) =>
+ isVariableWidthType(keyType) || isVariableWidthType(valueType)
+ case _ => false
+ }
+
+ private def getFieldSize(dt: DataType): (SpecializedGetters, Int) => Long = {
+ val size: (SpecializedGetters, Int) => Long = dt match {
+ case BinaryType => (input, i) => input.getBinary(i).length
+ case StringType =>
+ (input, i) => {
+ input.getUTF8String(i).numBytes
+ }
+ case StructType(fields) =>
+ val getFieldsSize = fields.map(field => getFieldSize(field.dataType))
+ (input, i) => {
+ val structData = input.getStruct(i, fields.length)
+ val sizes = Array.fill(fields.length)(0L)
+ for (i <- sizes.indices) {
+ sizes(i) = sizes(i) + getFieldsSize(i)(structData, i)
+ }
+ sizes.max
+ }
+ case ArrayType(elementType, _) =>
+ val innerSize = getFieldSize(elementType)
+ (input, i) => {
+ val arrayData = input.getArray(i)
+ var size = 0L
+ for (i <- 0 until arrayData.numElements()) {
+ size = size + innerSize(arrayData, i)
+ }
+ size
+ }
+ case MapType(keyType, valueType, _) =>
+ val getKeySize = getFieldSize(keyType)
+ val getValueSize = getFieldSize(valueType)
+ (input, i) => {
+ val mapData = input.getMap(i)
+ val keyArray = mapData.keyArray()
+ val valueArray = mapData.valueArray()
+ var keySize = 0L
+ var valueSize = 0L
+ for (i <- 0 until mapData.numElements()) {
+ keySize = keySize + getKeySize(keyArray, i)
+ valueSize = valueSize + getValueSize(valueArray, i)
+ }
+ Math.max(keySize, valueSize)
+ }
+ case _ => (_, _) => 0L // For fixed-width datatype, we let the size be 0.
+ }
+ (input: SpecializedGetters, i) => {
+ if (input.isNullAt(i)) {
+ 0L
+ } else {
+ size(input, i)
+ }
+ }
+ }
+
+ private val fieldsSizeGetter = generateExec.generatorOutput.map {
+ attribute => getFieldSize(attribute.dataType)
+ }.toArray
+
+ private val variableWidthFields = generateExec.generatorOutput.zipWithIndex
+ .filter(tuple => isVariableWidthType(tuple._1.dataType))
+ .map(_._2)
+ .toArray
+
+ private def writeRowUnsafe(rightRow: InternalRow, rightTargetRow:
ArrowColumnarRow): Unit = {
+ rightTargetRow.writeRowUnsafe(rightRow)
+ }
+
+ private def getResultColumnarBatch(
+ rightResultVectors: Array[ArrowWritableColumnVector],
+ resultLength: Int,
+ leftInputData: ColumnarBatch,
+ rowId2RowNum: Array[Int],
+ a2v: SQLMetric): ColumnarBatch = {
+ val rightTargetBatch =
+ new ColumnarBatch(rightResultVectors.map(_.asInstanceOf[ColumnVector]),
resultLength)
+ val start = System.currentTimeMillis()
+ val rightVeloxBatch = VeloxColumnarBatches.toVeloxBatch(
+ ColumnarBatches
+ .offload(ArrowBufferAllocators.contextInstance(), rightTargetBatch))
+ val resultBatch = if (rightVeloxBatch.numCols() != 0) {
+ val compositeBatch =
+ VeloxColumnarBatches.repeatedThenCompose(leftInputData,
rightVeloxBatch, rowId2RowNum)
+ rightVeloxBatch.close()
+ compositeBatch
+ } else {
+ rightVeloxBatch.close()
+ ColumnarBatches.retain(leftInputData)
+ leftInputData
+ }
+ a2v += System.currentTimeMillis() - start
+ resultBatch
+ }
+
+ private def getGeneratedResultVeloxArrow(
+ pruneChildInputData: ColumnarBatch,
+ generatorUsedInputData: ColumnarBatch,
+ hasNext: Boolean,
+ v2a: SQLMetric,
+ a2v: SQLMetric): Iterator[ColumnarBatch] = {
+ // select part of child output and child data
+ val numRows = generatorUsedInputData.numRows()
+ val start = System.currentTimeMillis()
+ val rightArrowBatch = loadArrowBatch(generatorUsedInputData)
+
+ v2a += System.currentTimeMillis() - start
+
+ val rowId2RowNum = Array.fill(numRows)(0)
+ var inputRowId = 0
+
+ val rowResults = new ArrayBuffer[InternalRow]()
+ while (inputRowId < numRows) {
+ val row = rightArrowBatch.getRow(inputRowId)
+ val resultRowsOption = generator.apply(row)
+ if (resultRowsOption.isDefined) {
+ val resultRows = resultRowsOption.get
+ rowResults ++= resultRows
+ rowId2RowNum(inputRowId) = resultRows.size
+ } else if (generateExec.outer) {
+ rowResults.append(generatorNullRow)
+ rowId2RowNum(inputRowId) = 1
+ }
+ inputRowId = inputRowId + 1
+ }
+ if (!hasNext) {
+ val resultRowsOption = generator.terminate()
+ if (resultRowsOption.isDefined) {
+ val resultRows = resultRowsOption.get
+ rowResults ++= resultRows
+ rowId2RowNum(inputRowId - 1) = rowId2RowNum(inputRowId - 1) +
resultRows.size
+ }
+ }
+
+ if (rowResults.isEmpty) {
+ pruneChildInputData.close()
+ generatorUsedInputData.close()
+ rightArrowBatch.close()
+ return Iterator.empty
+ }
+
+ val colSizes = Array.fill(generateExec.generatorOutput.length)(0L)
+ rowResults.foreach {
+ row =>
+ for (i <- variableWidthFields) {
+ colSizes(i) = colSizes(i) + fieldsSizeGetter(i)(row, i)
+ }
+ }
+
+ val rightResultVectors: Array[ArrowWritableColumnVector] =
+ ArrowWritableColumnVector.allocateColumns(rowResults.length, colSizes,
rightSchema)
+ val rightTargetRow = new ArrowColumnarRow(rightResultVectors)
+
+ rowResults.foreach(row => writeRowUnsafe(row, rightTargetRow))
+ rightTargetRow.finishWriteRow()
+
+ val resultBatch =
+ getResultColumnarBatch(
+ rightResultVectors,
+ rowResults.length,
+ pruneChildInputData,
+ rowId2RowNum,
+ a2v)
+
+ Iterators
+ .wrap(Iterator.single(resultBatch))
+ .recycleIterator({
+ rightArrowBatch.close()
+ rightResultVectors.foreach(_.close())
+ })
+ .create()
+ }
+
+ override def verboseStringWithOperatorId(): String = {
+ s"""
+ |$formattedNodeName
+ |${ExplainUtils.generateFieldString("Output", output)}
+ |${ExplainUtils.generateFieldString("Input", child.output)}
+ |${ExplainUtils.generateFieldString("GenerateExec", generateExec)}
+ |""".stripMargin
+ }
+
+ override def simpleString(maxFields: Int): String =
+ super.simpleString(maxFields) + " PartialGenerate " + generateExec
+
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
+ final override def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException(
+ s"${this.getClass.getSimpleName} doesn't support doExecute")
+ }
+
+ override def output: Seq[Attribute] = generateExec.output
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
ColumnarPartialGenerateExec = {
+ copy(child = newChild)
+ }
+}
+
+object ColumnarPartialGenerateExec {
+ def create(original: GenerateExec): ColumnarPartialGenerateExec = {
+ ColumnarPartialGenerateExec(original, original.child)
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
new file mode 100644
index 0000000000..5e641862e4
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{ColumnarPartialGenerateExec,
GenerateExecTransformer}
+import org.apache.gluten.utils.PlanUtil
+
+import org.apache.spark.sql.catalyst.expressions.UserDefinedExpression
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{GenerateExec, SparkPlan}
+
+case class PartialGenerateRule() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!GlutenConfig.get.enableColumnarPartialGenerate) {
+ return plan
+ }
+ val newPlan = plan match {
+ // If the root node of the plan is a GenerateExec and its child is a
gluten columnar op,
+ // we try to add a ColumnarPartialGenerateExec
+ case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child) =>
+ tryAddColumnarPartialGenerateExec(plan)
+ case _ => plan
+ }
+ newPlan.transformUp {
+ case parent: SparkPlan
+ if parent.children.exists(_.isInstanceOf[GenerateExec]) &&
+ PlanUtil.isGlutenColumnarOp(parent) =>
+ parent.mapChildren {
+ case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child) =>
+ tryAddColumnarPartialGenerateExec(plan)
+ case other => other
+ }
+ }
+ }
+
+ private def tryAddColumnarPartialGenerateExec(plan: GenerateExec): SparkPlan
= {
+ if (GenerateExecTransformer.supportsGenerate(plan.generator)) {
+ return plan
+ }
+ if (!plan.generator.isInstanceOf[UserDefinedExpression]) {
+ return plan
+ }
+ val transformer = ColumnarPartialGenerateExec.create(plan)
+ if (transformer.doValidate().ok()) {
+ transformer
+ } else plan
+ }
+}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/udtf/ConditionalOutputUDTF.java
b/backends-velox/src/test/java/org/apache/gluten/udtf/ConditionalOutputUDTF.java
new file mode 100644
index 0000000000..fcbfc91a55
--- /dev/null
+++
b/backends-velox/src/test/java/org/apache/gluten/udtf/ConditionalOutputUDTF.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.udtf;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ConditionalOutputUDTF extends GenericUDTF {
+
+ static final ObjectInspector LONG_TYPE =
PrimitiveObjectInspectorFactory.javaLongObjectInspector;
+
+ private PrimitiveObjectInspector arg0OI = null;
+
+ @Override
+ public void close() throws HiveException {}
+
+ @Override
+ public StructObjectInspector initialize(ObjectInspector[] argOIs) throws
UDFArgumentException {
+ // Input
+ if (argOIs.length != 1) {
+ throw new UDFArgumentException(getClass().getSimpleName() + "() takes
one arguments");
+ }
+ arg0OI = (PrimitiveObjectInspector) argOIs[0];
+
+ // Output
+ List<String> fieldNames = Collections.singletonList("longResult");
+ List<ObjectInspector> fieldOIs = Collections.singletonList(LONG_TYPE);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
+ }
+
+ @Override
+ public void process(Object[] args) throws HiveException {
+ Object arg0 = arg0OI.getPrimitiveJavaObject(args[0]);
+ if (arg0 == null) {
+ return;
+ }
+
+ long result = ((Long) arg0).longValue();
+ if (result % 2 == 0) {
+ Object[] forwardObj = new Long[1];
+ forwardObj[0] = result;
+ forward(forwardObj);
+ }
+ }
+}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/udtf/CustomerUDTF.java
b/backends-velox/src/test/java/org/apache/gluten/udtf/CustomerUDTF.java
new file mode 100644
index 0000000000..deac78f8a6
--- /dev/null
+++ b/backends-velox/src/test/java/org/apache/gluten/udtf/CustomerUDTF.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.udtf;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CustomerUDTF extends GenericUDTF {
+ static final ObjectInspector STR_TYPE =
PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+
+ private PrimitiveObjectInspector arg0OI = null;
+
+ private PrimitiveObjectInspector arg1OI = null;
+
+ private final Map<Object, Object> mapResult = new HashMap<>();
+
+ @Override
+ public void close() throws HiveException {}
+
+ @Override
+ public StructObjectInspector initialize(ObjectInspector[] argOIs) throws
UDFArgumentException {
+ // Input
+ if (argOIs.length != 2) {
+ throw new UDFArgumentException(getClass().getSimpleName() + "() takes
two arguments");
+ }
+ arg0OI = (PrimitiveObjectInspector) argOIs[0];
+ arg1OI = (PrimitiveObjectInspector) argOIs[1];
+
+ // Output
+ final MapObjectInspector mapInspector =
+ ObjectInspectorFactory.getStandardMapObjectInspector(STR_TYPE,
STR_TYPE);
+ List<String> fieldNames = Arrays.asList("strResult", "mapResult");
+ List<ObjectInspector> fieldOIs = Arrays.asList(STR_TYPE, mapInspector);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
+ }
+
+ @Override
+ public void process(Object[] args) throws HiveException {
+ Object arg0 = arg0OI.getPrimitiveJavaObject(args[0]);
+ Object arg1 = arg1OI.getPrimitiveJavaObject(args[1]);
+ if (arg0 == null || arg1 == null) {
+ return;
+ }
+ mapResult.clear();
+ String[] strs = ((String) arg1).split(" ");
+ mapResult.put(arg0, arg1);
+ for (int i = 0; i < strs.length - 1; i += 2) {
+ mapResult.put(strs[i], strs[i + 1]);
+ }
+ Object[] forwardObj = new Object[2];
+ forwardObj[0] = arg0;
+ forwardObj[1] = mapResult;
+ forward(forwardObj);
+ }
+}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/udtf/NoInputUDTF.java
b/backends-velox/src/test/java/org/apache/gluten/udtf/NoInputUDTF.java
new file mode 100644
index 0000000000..6699ae5d8e
--- /dev/null
+++ b/backends-velox/src/test/java/org/apache/gluten/udtf/NoInputUDTF.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.udtf;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class NoInputUDTF extends GenericUDTF {
+
+ static final ObjectInspector LONG_TYPE =
PrimitiveObjectInspectorFactory.javaLongObjectInspector;
+
+ private long result = 0;
+
+ @Override
+ public void close() throws HiveException {
+ Object[] forwardObj = new Long[1];
+ forwardObj[0] = result;
+ forward(forwardObj);
+ }
+
+ @Override
+ public StructObjectInspector initialize(ObjectInspector[] argOIs) throws
UDFArgumentException {
+ // Output
+ List<String> fieldNames = Collections.singletonList("longResult");
+ List<ObjectInspector> fieldOIs = Collections.singletonList(LONG_TYPE);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
+ }
+
+ @Override
+ public void process(Object[] args) throws HiveException {
+ Object[] forwardObj = new Long[1];
+ forwardObj[0] = result;
+ result += 1;
+ forward(forwardObj);
+ }
+}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/udtf/SimpleUDTF.java
b/backends-velox/src/test/java/org/apache/gluten/udtf/SimpleUDTF.java
new file mode 100644
index 0000000000..0037b711a1
--- /dev/null
+++ b/backends-velox/src/test/java/org/apache/gluten/udtf/SimpleUDTF.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.udtf;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import java.util.*;
+
+public class SimpleUDTF extends GenericUDTF {
+
+ static final ObjectInspector LONG_TYPE =
PrimitiveObjectInspectorFactory.javaLongObjectInspector;
+
+ private PrimitiveObjectInspector arg0OI = null;
+
+ @Override
+ public void close() throws HiveException {}
+
+ @Override
+ public StructObjectInspector initialize(ObjectInspector[] argOIs) throws
UDFArgumentException {
+ // Input
+ if (argOIs.length != 1) {
+ throw new UDFArgumentException(getClass().getSimpleName() + "() takes
one arguments");
+ }
+ arg0OI = (PrimitiveObjectInspector) argOIs[0];
+
+ // Output
+ List<String> fieldNames = Collections.singletonList("longResult");
+ List<ObjectInspector> fieldOIs = Collections.singletonList(LONG_TYPE);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
+ }
+
+ @Override
+ public void process(Object[] args) throws HiveException {
+ Object arg0 = arg0OI.getPrimitiveJavaObject(args[0]);
+ if (arg0 == null) {
+ return;
+ }
+
+ Object[] forwardObj = new Long[1];
+ forwardObj[0] = ((Long) arg0).longValue();
+ forward(forwardObj);
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
index 0a2cdb5b71..317f509424 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
@@ -17,17 +17,19 @@
package org.apache.spark.sql.execution
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.ColumnarPartialProjectExec
+import org.apache.gluten.execution.{ColumnarPartialGenerateExec,
ColumnarPartialProjectExec, GlutenQueryComparisonTest}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.udf.CustomerUDF
+import org.apache.gluten.udtf.{ConditionalOutputUDTF, CustomerUDTF,
NoInputUDTF, SimpleUDTF}
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI.UI_ENABLED
-import org.apache.spark.sql.{DataFrame, GlutenQueryTest, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.classic.ClassicTypes._
+import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SQLTestUtils
@@ -37,7 +39,7 @@ import java.io.File
import scala.collection.mutable
import scala.reflect.ClassTag
-class GlutenHiveUDFSuite extends GlutenQueryTest with SQLTestUtils {
+class GlutenHiveUDFSuite extends GlutenQueryComparisonTest with SQLTestUtils {
private var _spark: SparkSession = _
override protected def beforeAll(): Unit = {
@@ -122,6 +124,76 @@ class GlutenHiveUDFSuite extends GlutenQueryTest with
SQLTestUtils {
}
}
+ test("customer udtf") {
+ withTempFunction("testUDTF") {
+ sql(s"CREATE TEMPORARY FUNCTION testUDTF AS
'${classOf[CustomerUDTF].getName}';")
+ runQueryAndCompare(
+ "select l_partkey, col0, col1 from lineitem lateral view" +
+ " testUDTF(l_partkey, l_comment) as col0, col1") {
+ checkOperatorMatch[ColumnarPartialGenerateExec]
+ }
+ }
+ }
+
+ test("simple udtf") {
+ withTempFunction("simpleUDTF") {
+ sql(s"CREATE TEMPORARY FUNCTION simpleUDTF AS
'${classOf[SimpleUDTF].getName}'")
+ runQueryAndCompare(
+ "select l_partkey, col0 from lineitem lateral view" +
+ " simpleUDTF(l_orderkey) as col0") {
+ checkOperatorMatch[ColumnarPartialGenerateExec]
+ }
+ }
+ }
+
+ test("no argument udtf") {
+ withTempFunction("noInputUDTF") {
+ sql(s"CREATE TEMPORARY FUNCTION noInputUDTF AS
'${classOf[NoInputUDTF].getName}'")
+ runQueryAndCompare(
+ "select l_partkey, col0 from lineitem lateral view" +
+ " noInputUDTF() as col0") {
+ checkOperatorMatch[ColumnarPartialGenerateExec]
+ }
+ }
+ }
+
+ test("lateral view outer udtf") {
+ withTempFunction("conditionalOutputUDTF") {
+ sql(
+ s"CREATE TEMPORARY FUNCTION conditionalOutputUDTF" +
+ s" AS '${classOf[ConditionalOutputUDTF].getName}'")
+ runQueryAndCompare(
+ "select l_partkey, col0 from lineitem lateral view outer" +
+ " conditionalOutputUDTF(l_orderkey) as col0") {
+ checkOperatorMatch[ColumnarPartialGenerateExec]
+ }
+ }
+ }
+
+ test("child of GenerateExec is not offloadable") {
+ withTempFunction("testUDTF") {
+ val plusOne = udf((x: Long) => x + 1)
+ spark.udf.register("plus_one", plusOne)
+ withSQLConf(
+ GlutenConfig.ENABLE_COLUMNAR_PARTIAL_PROJECT.key -> "false"
+ ) {
+ sql(s"CREATE TEMPORARY FUNCTION testUDTF AS
'${classOf[CustomerUDTF].getName}'")
+ runQueryAndCompare(
+ "select col0, col1 from (select plus_one(l_partkey) as " +
+ "l_partkey, l_comment from lineitem) lateral view" +
+ " testUDTF(l_partkey, l_comment) as col0, col1",
+ noFallBack = false
+ ) {
+ df =>
+ assert(
+ df.queryExecution.executedPlan
+ .find(_.isInstanceOf[ColumnarPartialGenerateExec])
+ .isEmpty)
+ }
+ }
+ }
+ }
+
test("customer udf wrapped in function") {
withTempFunction("testUDF") {
sql(s"CREATE TEMPORARY FUNCTION testUDF AS
'${classOf[CustomerUDF].getName}'")
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 8249365fbb..9e4f37687a 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -211,6 +211,66 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJ
JNI_METHOD_END(kInvalidObjectHandle)
}
+JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_repeatedThenCompose(
// NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong repeatedBatchHandle,
+ jlong nonRepeatedBatchHandle,
+ jintArray rowId2RowNums) {
+ JNI_METHOD_START
+ auto ctx = getRuntime(env, wrapper);
+ auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
+
+ int rowId2RowNumsSize = env->GetArrayLength(rowId2RowNums);
+ auto safeRowId2RowNumsArray = getIntArrayElementsSafe(env, rowId2RowNums);
+
+ auto veloxPool = runtime->memoryManager()->getLeafMemoryPool();
+ vector_size_t rowNums = 0;
+ for (int i = 0; i < rowId2RowNumsSize; ++i) {
+ rowNums += safeRowId2RowNumsArray.elems()[i];
+ }
+
+ // Create a indices vector.
+ // The indices will be used to create a dictionary vector for the first
batch.
+ auto repeatedIndices = AlignedBuffer::allocate<vector_size_t>(rowNums,
veloxPool.get(), 0);
+ auto* rawRepeatedIndices = repeatedIndices->asMutable<vector_size_t>();
+ int lastRowIndexEnd = 0;
+ for (int i = 0; i < rowId2RowNumsSize; ++i) {
+ auto rowNum = safeRowId2RowNumsArray.elems()[i];
+ std::fill(rawRepeatedIndices + lastRowIndexEnd, rawRepeatedIndices +
lastRowIndexEnd + rowNum, i);
+ lastRowIndexEnd += rowNum;
+ }
+
+ auto repeatedBatch =
ObjectStore::retrieve<ColumnarBatch>(repeatedBatchHandle);
+ auto nonRepeatedBatch =
ObjectStore::retrieve<ColumnarBatch>(nonRepeatedBatchHandle);
+ GLUTEN_CHECK(rowNums == nonRepeatedBatch->numRows(),
+ "Row numbers after repeated do not match the expected size");
+
+ // wrap repeatedBatch's rowVector in dictionary vector.
+ auto vb = std::dynamic_pointer_cast<VeloxColumnarBatch>(repeatedBatch);
+ auto rowVector = vb->getRowVector();
+ std::vector<VectorPtr> outputs(rowVector->childrenSize());
+ for (int i = 0; i < outputs.size(); i++) {
+ outputs[i] = BaseVector::wrapInDictionary(
+ nullptr /*nulls*/,
+ repeatedIndices,
+ rowNums,
+ rowVector->childAt(i));
+ }
+ auto newRowVector = std::make_shared<RowVector>(
+ veloxPool.get(),
+ rowVector->type(),
+ BufferPtr(nullptr),
+ rowNums,
+ std::move(outputs));
+ repeatedBatch =
std::make_shared<VeloxColumnarBatch>(std::move(newRowVector));
+ auto newBatch = VeloxColumnarBatch::compose(
+ veloxPool.get(),
+ {std::move(repeatedBatch), std::move(nonRepeatedBatch)});
+ return ctx->saveObject(newBatch);
+ JNI_METHOD_END(kInvalidObjectHandle)
+}
+
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_empty( // NOLINT
JNIEnv* env,
jobject wrapper,
diff --git a/docs/Configuration.md b/docs/Configuration.md
index e180e33ffd..e3350b8d87 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -75,20 +75,21 @@ nav_order: 15
| spark.gluten.sql.columnar.maxBatchSize | 4096
|
| spark.gluten.sql.columnar.overwriteByExpression | true
| Enable or disable columnar v2 command overwrite by expression.
|
| spark.gluten.sql.columnar.parquet.write.blockSize | 128MB
|
-| spark.gluten.sql.columnar.partial.project | true
| Break up one project node into 2 phases when some of the
expressions are non offload-able. Phase one is a regular offloaded project
transformer that evaluates the offload-able expressions in native, phase two
preserves the output from phase one and evaluates the remaining
non-offload-able expressions using vanilla Spark projections
|
-| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12
| Fallback to row operators if there are several continuous joins.
|
-| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false
| Enable or disable columnar physicalJoinOptimize.
|
-| spark.gluten.sql.columnar.preferStreamingAggregate | true
| Velox backend supports `StreamingAggregate`. `StreamingAggregate`
uses the less memory as it does not need to hold all groups in memory, so it
could avoid spill. When true and the child output ordering satisfies the
grouping key then Gluten will choose `StreamingAggregate` as the native
operator. |
-| spark.gluten.sql.columnar.project | true
| Enable or disable columnar project.
|
-| spark.gluten.sql.columnar.project.collapse | true
| Combines two columnar project operators into one and perform alias
substitution
|
-| spark.gluten.sql.columnar.query.fallback.threshold | -1
| The threshold for whether query will fall back by counting the
number of ColumnarToRow & vanilla leaf node.
|
-| spark.gluten.sql.columnar.range | true
| Enable or disable columnar range.
|
-| spark.gluten.sql.columnar.replaceData | true
| Enable or disable columnar v2 command replace data.
|
-| spark.gluten.sql.columnar.scanOnly | false
| When enabled, only scan and the filter after scan will be offloaded
to native.
|
-| spark.gluten.sql.columnar.shuffle | true
| Enable or disable columnar shuffle.
|
-| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true
| If enabled, fall back to ColumnarShuffleManager when celeborn
service is unavailable.Otherwise, throw an exception.
|
-| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true
| If true, use RSS sort implementation for Celeborn sort-based
shuffle.If false, use Gluten's row-based sort implementation. Only valid when
`spark.celeborn.client.spark.shuffle.writer` is set to `sort`.
|
-| spark.gluten.sql.columnar.shuffle.codec |
<undefined> | By default, the supported codecs are lz4 and zstd. When
spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are
gzip and zstd.
|
+| spark.gluten.sql.columnar.partial.project | true
| Break up one project node into 2 phases when some of the
expressions are non offload-able. Phase one is a regular offloaded project
transformer that evaluates the offload-able expressions in native, phase two
preserves the output from phase one and evaluates the remaining
non-offload-able expressions using vanilla Spark projections
[...]
+| spark.gluten.sql.columnar.partial.generate | true
| evaluates the non-offload-able HiveUDTF using vanilla Spark
generator
[...]
+| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12
| Fallback to row operators if there are several continuous joins.
[...]
+| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false
| Enable or disable columnar physicalJoinOptimize.
[...]
+| spark.gluten.sql.columnar.preferStreamingAggregate | true
| Velox backend supports `StreamingAggregate`. `StreamingAggregate`
uses the less memory as it does not need to hold all groups in memory, so it
could avoid spill. When true and the child output ordering satisfies the
grouping key then Gluten will choose `StreamingAggregate` as the native
operator.
[...]
+| spark.gluten.sql.columnar.project | true
| Enable or disable columnar project.
[...]
+| spark.gluten.sql.columnar.project.collapse | true
| Combines two columnar project operators into one and perform alias
substitution
[...]
+| spark.gluten.sql.columnar.query.fallback.threshold | -1
| The threshold for whether query will fall back by counting the
number of ColumnarToRow & vanilla leaf node.
[...]
+| spark.gluten.sql.columnar.range | true
| Enable or disable columnar range.
[...]
+| spark.gluten.sql.columnar.replaceData | true
| Enable or disable columnar v2 command replace data.
[...]
+| spark.gluten.sql.columnar.scanOnly | false
| When enabled, only scan and the filter after scan will be offloaded
to native.
[...]
+| spark.gluten.sql.columnar.shuffle | true
| Enable or disable columnar shuffle.
[...]
+| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true
| If enabled, fall back to ColumnarShuffleManager when celeborn
service is unavailable.Otherwise, throw an exception.
[...]
+| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true
| If true, use RSS sort implementation for Celeborn sort-based
shuffle.If false, use Gluten's row-based sort implementation. Only valid when
`spark.celeborn.client.spark.shuffle.writer` is set to `sort`.
[...]
+| spark.gluten.sql.columnar.shuffle.codec |
<undefined> | By default, the supported codecs are lz4 and zstd. When
spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are
gzip and zstd.
[...]
| spark.gluten.sql.columnar.shuffle.codecBackend |
<undefined> |
| spark.gluten.sql.columnar.shuffle.compression.threshold | 100
| If number of rows in a batch falls below this threshold, will copy
all buffers into one buffer to compress.
|
| spark.gluten.sql.columnar.shuffle.dictionary.enabled | false
| Enable dictionary in hash-based shuffle.
|
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java
index eb50cbcbe8..5491b19ca3 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java
@@ -19,25 +19,7 @@ package org.apache.gluten.vectorized;
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.NullVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.VectorLoader;
-import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
@@ -110,6 +92,23 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
return vectors;
}
+ public static ArrowWritableColumnVector[] allocateColumns(
+ int valueCount, long[] totalSizes, StructType schema) {
+ String timeZoneId = SparkSchemaUtil.getLocalTimezoneID();
+ Schema arrowSchema = SparkArrowUtil.toArrowSchema(schema, timeZoneId);
+ VectorSchemaRoot newRoot =
+ VectorSchemaRoot.create(arrowSchema,
ArrowBufferAllocators.contextInstance());
+
+ List<FieldVector> fieldVectors = newRoot.getFieldVectors();
+ ArrowWritableColumnVector[] vectors = new
ArrowWritableColumnVector[fieldVectors.size()];
+ for (int i = 0; i < fieldVectors.size(); i++) {
+ vectors[i] =
+ new ArrowWritableColumnVector(
+ fieldVectors.get(i), null, i, valueCount, totalSizes[i], true);
+ }
+ return vectors;
+ }
+
public static ArrowWritableColumnVector[] loadColumns(
int capacity, List<FieldVector> fieldVectors, List<FieldVector>
dictionaryVectors) {
if (fieldVectors.size() != dictionaryVectors.size()) {
@@ -169,6 +168,41 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
createVectorAccessor(vector, dicionaryVector);
}
+ public ArrowWritableColumnVector(
+ ValueVector vector,
+ ValueVector dicionaryVector,
+ int ordinal,
+ int valueCount,
+ long totalBytes,
+ boolean init) {
+ super(valueCount, SparkArrowUtil.fromArrowField(vector.getField()));
+ vectorCount.getAndIncrement();
+ refCnt.getAndIncrement();
+
+ this.ordinal = ordinal;
+ this.vector = vector;
+ this.dictionaryVector = dicionaryVector;
+ if (init) {
+ if (vector instanceof VariableWidthVector) {
+ ((VariableWidthVector) vector).allocateNew(totalBytes, capacity);
+ } else if (vector instanceof DensityAwareVector) {
+ double density;
+ if (vector instanceof ListVector) {
+ density = Math.sqrt(1.0 * totalBytes / valueCount);
+ } else {
+ density = 1.0 * totalBytes / valueCount;
+ }
+ ((DensityAwareVector) vector).setInitialCapacity(capacity, density);
+ vector.allocateNew();
+ } else {
+ vector.setInitialCapacity(capacity);
+ vector.allocateNew();
+ }
+ }
+ writer = createVectorWriter(vector);
+ createVectorAccessor(vector, dicionaryVector);
+ }
+
public ArrowWritableColumnVector(int capacity, DataType dataType) {
super(capacity, dataType);
vectorCount.getAndIncrement();
@@ -753,6 +787,10 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
writer.write(input, ordinal);
}
+ public void writeUnsafe(SpecializedGetters input, int ordinal) {
+ writer.writeUnsafe(input, ordinal);
+ }
+
public void finishWrite() {
writer.finish();
}
@@ -1414,6 +1452,8 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
abstract void setValueNullSafe(SpecializedGetters input, int ordinal);
+ abstract void unsafeSetValueNullSafe(SpecializedGetters input, int
ordinal);
+
void write(SpecializedGetters input, int ordinal) {
if (input.isNullAt(ordinal)) {
setNull(count);
@@ -1423,6 +1463,15 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
count = count + 1;
}
+ void writeUnsafe(SpecializedGetters input, int ordinal) {
+ if (input.isNullAt(ordinal)) {
+ setNull(count);
+ } else {
+ unsafeSetValueNullSafe(input, ordinal);
+ }
+ count = count + 1;
+ }
+
void finish() {
vector.setValueCount(count);
}
@@ -1469,6 +1518,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
this.setBoolean(count, input.getBoolean(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getBoolean(ordinal) ? 1 : 0);
+ }
}
private static class ByteWriter extends ArrowVectorWriter {
@@ -1514,6 +1568,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
this.setByte(count, input.getByte(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getByte(ordinal));
+ }
}
private static class ShortWriter extends ArrowVectorWriter {
@@ -1566,6 +1625,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
this.setShort(count, input.getShort(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.setSafe(count, input.getShort(ordinal));
+ }
}
private static class IntWriter extends ArrowVectorWriter {
@@ -1631,6 +1695,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
setInt(count, input.getInt(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getInt(ordinal));
+ }
}
private static class LongWriter extends ArrowVectorWriter {
@@ -1702,6 +1771,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
writer.setSafe(rowId + i, tmp);
}
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getLong(ordinal));
+ }
}
private static class FloatWriter extends ArrowVectorWriter {
@@ -1747,6 +1821,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
setFloat(count, input.getFloat(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getFloat(ordinal));
+ }
}
private static class DoubleWriter extends ArrowVectorWriter {
@@ -1792,6 +1871,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
setDouble(count, input.getDouble(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getDouble(ordinal));
+ }
}
private static class DecimalWriter extends ArrowVectorWriter {
@@ -1846,6 +1930,16 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
setNull(count);
}
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ Decimal decimal = input.getDecimal(ordinal, precision, scale);
+ if (decimal.changePrecision(precision, scale)) {
+ writer.set(count, decimal.toJavaBigDecimal());
+ } else {
+ setNull(count);
+ }
+ }
}
private static class StringWriter extends ArrowVectorWriter {
@@ -1886,6 +1980,12 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
UTF8String value = input.getUTF8String(ordinal);
setBytes(count, value.numBytes(), value.getBytes(), 0);
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ UTF8String value = input.getUTF8String(ordinal);
+ writer.set(count, value.getBytes(), 0, value.numBytes());
+ }
}
private static class BinaryWriter extends ArrowVectorWriter {
@@ -1918,6 +2018,12 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
byte[] value = input.getBinary(ordinal);
setBytes(count, value.length, value, 0);
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ byte[] value = input.getBinary(ordinal);
+ writer.set(count, value, 0, value.length);
+ }
}
private static class DateWriter extends ArrowVectorWriter {
@@ -1956,6 +2062,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
setInt(count, input.getInt(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getInt(ordinal));
+ }
}
private static class TimestampMicroWriter extends ArrowVectorWriter {
@@ -1994,6 +2105,11 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
setLong(count, input.getLong(ordinal));
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.set(count, input.getLong(ordinal));
+ }
}
private static class ArrayWriter extends ArrowVectorWriter {
@@ -2029,6 +2145,18 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
writer.endValue(count, arrayData.numElements());
}
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ ArrayData arrayData = input.getArray(ordinal);
+ writer.startNewValue(count);
+ // We just make sure the value buffer size allocated is enough, so the
vector may
+ // reallocate the validity buffer, so we use the safe interface here.
+ for (int i = 0; i < arrayData.numElements(); ++i) {
+ elementWriter.write(arrayData, i);
+ }
+ writer.endValue(count, arrayData.numElements());
+ }
+
@Override
void finish() {
super.finish();
@@ -2075,6 +2203,17 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
}
}
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ InternalRow struct = input.getStruct(ordinal, childrenWriter.length);
+ writer.setIndexDefined(count);
+ // We just make sure the value buffer size allocated is enough, so the
vector may
+ // reallocate the validity buffer, so we use the safe interface here.
+ for (int i = 0; i < struct.numFields(); ++i) {
+ childrenWriter[i].write(struct, i);
+ }
+ }
+
@Override
void finish() {
super.finish();
@@ -2123,6 +2262,23 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
writer.endValue(count, mapData.numElements());
}
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ MapData mapData = input.getMap(ordinal);
+ writer.startNewValue(count);
+ ArrayData keys = mapData.keyArray();
+ ArrayData values = mapData.valueArray();
+ // We just make sure the value buffer size allocated is enough, so the
vector may
+ // reallocate the validity buffer, so we use the safe interface here.
+ for (int i = 0; i < mapData.numElements(); ++i) {
+ keyWriter.write(keys, i);
+ }
+ for (int i = 0; i < mapData.numElements(); ++i) {
+ valueWriter.write(values, i);
+ }
+ writer.endValue(count, mapData.numElements());
+ }
+
@Override
void finish() {
super.finish();
@@ -2155,5 +2311,10 @@ public final class ArrowWritableColumnVector extends
WritableColumnVectorShim {
void setValueNullSafe(SpecializedGetters input, int ordinal) {
setNull(count);
}
+
+ @Override
+ void unsafeSetValueNullSafe(SpecializedGetters input, int ordinal) {
+ writer.setValueCount(writer.getValueCount() + 1);
+ }
}
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/expression/InterpretedArrowGenerate.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/expression/InterpretedArrowGenerate.scala
new file mode 100644
index 0000000000..37f9d1aed8
--- /dev/null
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/expression/InterpretedArrowGenerate.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.expression
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
ExpressionsEvaluator, Generator}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
+
+/**
+ * A [[InterpretedArrowGenerate]] that is calculated by calling `eval` on the
specified generate.
+ * Because `HiveGenericUDTF` uses `ArrayBuffer` to store results and the
actual return type of
+ * `eval` method of `HiveGenericUDTF` is `Seq` , so `eval` and `terminate`
just returns `Seq`.
+ *
+ * @param generate
+ * an expression that determine the value of each column of the output row.
+ */
+case class InterpretedArrowGenerate(generator: Generator)
+ extends (InternalRow => Option[Seq[InternalRow]])
+ with ExpressionsEvaluator {
+ def this(generator: Generator, inputSchema: Seq[Attribute]) =
+ this(bindReferences(Seq(generator), inputSchema).head)
+
+ override def apply(input: InternalRow): Option[Seq[InternalRow]] = {
+ val resultRows = generator.eval(input)
+ if (resultRows.isEmpty) {
+ None
+ } else {
+ Some(resultRows.asInstanceOf[Seq[InternalRow]])
+ }
+ }
+
+ def terminate(): Option[Seq[InternalRow]] = {
+ val resultRows = generator.terminate()
+ if (resultRows.isEmpty) {
+ None
+ } else {
+ Some(resultRows.asInstanceOf[Seq[InternalRow]])
+ }
+ }
+}
+
+object InterpretedArrowGenerate {
+ def create(generator: Generator): InterpretedArrowGenerate = {
+ new InterpretedArrowGenerate(generator)
+ }
+}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/vectorized/ArrowColumnarRow.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/vectorized/ArrowColumnarRow.scala
index 36dfddb820..e5452e4ae5 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/vectorized/ArrowColumnarRow.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/vectorized/ArrowColumnarRow.scala
@@ -219,6 +219,16 @@ final class ArrowColumnarRow(writableColumns:
Array[ArrowWritableColumnVector])
}
}
+ def writeRowUnsafe(input: InternalRow): Unit = {
+ if (input.numFields != columns.length) {
+ throw new GlutenException(
+ "The numFields of input row should be equal to the number of column
vector!")
+ }
+ for (i <- 0 until input.numFields) {
+ columns(i).writeUnsafe(input, i)
+ }
+ }
+
def finishWriteRow(): Unit = {
var i = 0
while (i < columns.length) {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index c8c80e0901..6c28443079 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -324,6 +324,8 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def enableColumnarPartialProject: Boolean =
getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT)
+ def enableColumnarPartialGenerate: Boolean =
getConf(ENABLE_COLUMNAR_PARTIAL_GENERATE)
+
def enableCastAvgAggregateFunction: Boolean =
getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
def enableHiveFileFormatWriter: Boolean =
getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
@@ -1373,6 +1375,12 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
+ val ENABLE_COLUMNAR_PARTIAL_GENERATE =
+ buildConf("spark.gluten.sql.columnar.partial.generate")
+ .doc("Evaluates the non-offload-able HiveUDTF using vanilla Spark
generator")
+ .booleanConf
+ .createWithDefault(true)
+
val ENABLE_COMMON_SUBEXPRESSION_ELIMINATE =
buildConf("spark.gluten.sql.commonSubexpressionEliminate")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]