waitinfuture commented on code in PR #7360: URL: https://github.com/apache/incubator-gluten/pull/7360#discussion_r1799517460
########## backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala: ########## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.expression.ExpressionUtils +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} +import org.apache.gluten.extension.columnar.validator.Validator.Passed +import org.apache.gluten.iterator.Iterators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection} +import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, WritableColumnVector} +import org.apache.spark.sql.hive.HiveUdfUtil +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType, YearMonthIntervalType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +import scala.collection.mutable.ListBuffer + +/** + * By rule <PartialProhectRule>, the project not offoload-able that is changed to + * ProjectExecTransformer + ColumnarPartialProjectExec e.g. sum(myudf(a) + b + hash(c)), child is + * (a, b, c) ColumnarPartialProjectExec (a, b, c, myudf(a)), ProjectExecTransformer(myudf(a) + b + + * hash(c)) + * + * @param original + * extract the ScalaUDF from original project list as Alias in UnsafeProjection and + * AttributeReference in ColumnarPartialProjectExec output + * @param child + * child plan + */ +case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)( + replacedAliasUdf: ListBuffer[Alias]) + extends UnaryExecNode + with GlutenPlan { + + private val debug = GlutenConfig.getConf.debug + + private val projectAttributes: ListBuffer[Attribute] = ListBuffer() + private val projectIndexInChild: ListBuffer[Int] = ListBuffer() + private var UDFAttrNotExists = false + private var hasUnsupportedDataType = replacedAliasUdf.exists(a => !validateDataType(a.dataType)) + if (!hasUnsupportedDataType) { + getProjectIndexInChildOutput(replacedAliasUdf) + } + + @transient override lazy val metrics = Map( + "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of partial project"), + "column_to_row_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of velox to Arrow ColumnarBatch or UnsafeRow"), + "row_to_column_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of Arrow ColumnarBatch or UnsafeRow to velox") + ) + + override def output: Seq[Attribute] = child.output ++ replacedAliasUdf.map(_.toAttribute) Review Comment: No need to include all child's output if only used by `replacedAliasUdf` ########## backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala: ########## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.expression.ExpressionUtils +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} +import org.apache.gluten.extension.columnar.validator.Validator.Passed +import org.apache.gluten.iterator.Iterators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection} +import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, WritableColumnVector} +import org.apache.spark.sql.hive.HiveUdfUtil +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType, YearMonthIntervalType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +import scala.collection.mutable.ListBuffer + +/** + * By rule <PartialProhectRule>, the project not offoload-able that is changed to + * ProjectExecTransformer + ColumnarPartialProjectExec e.g. sum(myudf(a) + b + hash(c)), child is + * (a, b, c) ColumnarPartialProjectExec (a, b, c, myudf(a)), ProjectExecTransformer(myudf(a) + b + + * hash(c)) + * + * @param original + * extract the ScalaUDF from original project list as Alias in UnsafeProjection and + * AttributeReference in ColumnarPartialProjectExec output + * @param child + * child plan + */ +case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)( + replacedAliasUdf: ListBuffer[Alias]) + extends UnaryExecNode + with GlutenPlan { + + private val debug = GlutenConfig.getConf.debug + + private val projectAttributes: ListBuffer[Attribute] = ListBuffer() + private val projectIndexInChild: ListBuffer[Int] = ListBuffer() + private var UDFAttrNotExists = false + private var hasUnsupportedDataType = replacedAliasUdf.exists(a => !validateDataType(a.dataType)) + if (!hasUnsupportedDataType) { + getProjectIndexInChildOutput(replacedAliasUdf) + } + + @transient override lazy val metrics = Map( + "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of partial project"), + "column_to_row_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of velox to Arrow ColumnarBatch or UnsafeRow"), + "row_to_column_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of Arrow ColumnarBatch or UnsafeRow to velox") + ) + + override def output: Seq[Attribute] = child.output ++ replacedAliasUdf.map(_.toAttribute) + + final override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + s"${this.getClass.getSimpleName} doesn't support doExecute") + } + + final override protected def otherCopyArgs: Seq[AnyRef] = { + replacedAliasUdf :: Nil + } + + final override val supportsColumnar: Boolean = true + + private def validateExpression(expr: Expression): Boolean = { + expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children + .forall(validateExpression) + } + + private def validateDataType(dataType: DataType): Boolean = { + dataType match { + case _: BooleanType => true + case _: ByteType => true + case _: ShortType => true + case _: IntegerType => true + case _: LongType => true + case _: FloatType => true + case _: DoubleType => true + case _: StringType => true + case _: TimestampType => true + case _: DateType => true + case _: BinaryType => true + case _: DecimalType => true + case YearMonthIntervalType.DEFAULT => true + case _: NullType => true + case _ => false Review Comment: Is there particular reason not to support complex types? ########## backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala: ########## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches} +import org.apache.gluten.expression.ExpressionUtils +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} +import org.apache.gluten.extension.columnar.validator.Validator.Passed +import org.apache.gluten.iterator.Iterators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection} +import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, WritableColumnVector} +import org.apache.spark.sql.hive.HiveUdfUtil +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType, YearMonthIntervalType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +import scala.collection.mutable.ListBuffer + +/** + * By rule <PartialProhectRule>, the project not offoload-able that is changed to + * ProjectExecTransformer + ColumnarPartialProjectExec e.g. sum(myudf(a) + b + hash(c)), child is + * (a, b, c) ColumnarPartialProjectExec (a, b, c, myudf(a)), ProjectExecTransformer(myudf(a) + b + + * hash(c)) + * + * @param original + * extract the ScalaUDF from original project list as Alias in UnsafeProjection and + * AttributeReference in ColumnarPartialProjectExec output + * @param child + * child plan + */ +case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)( + replacedAliasUdf: ListBuffer[Alias]) + extends UnaryExecNode + with GlutenPlan { + + private val debug = GlutenConfig.getConf.debug + + private val projectAttributes: ListBuffer[Attribute] = ListBuffer() + private val projectIndexInChild: ListBuffer[Int] = ListBuffer() + private var UDFAttrNotExists = false + private var hasUnsupportedDataType = replacedAliasUdf.exists(a => !validateDataType(a.dataType)) + if (!hasUnsupportedDataType) { + getProjectIndexInChildOutput(replacedAliasUdf) + } + + @transient override lazy val metrics = Map( + "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of partial project"), + "column_to_row_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of velox to Arrow ColumnarBatch or UnsafeRow"), + "row_to_column_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of Arrow ColumnarBatch or UnsafeRow to velox") + ) + + override def output: Seq[Attribute] = child.output ++ replacedAliasUdf.map(_.toAttribute) + + final override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + s"${this.getClass.getSimpleName} doesn't support doExecute") + } + + final override protected def otherCopyArgs: Seq[AnyRef] = { + replacedAliasUdf :: Nil + } + + final override val supportsColumnar: Boolean = true + + private def validateExpression(expr: Expression): Boolean = { + expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children + .forall(validateExpression) + } + + private def validateDataType(dataType: DataType): Boolean = { + dataType match { + case _: BooleanType => true + case _: ByteType => true + case _: ShortType => true + case _: IntegerType => true + case _: LongType => true + case _: FloatType => true + case _: DoubleType => true + case _: StringType => true + case _: TimestampType => true + case _: DateType => true + case _: BinaryType => true + case _: DecimalType => true + case YearMonthIntervalType.DEFAULT => true + case _: NullType => true + case _ => false + } + } + + private def getProjectIndexInChildOutput(exprs: Seq[Expression]): Unit = { + exprs.foreach { + case a: AttributeReference => + val index = child.output.indexWhere(s => s.exprId.equals(a.exprId)) + // Some child operator as HashAggregateTransformer will not have udf child column + if (index < 0) { + UDFAttrNotExists = true + log.debug(s"Expression $a should exist in child output ${child.output}") + return + } else if (!validateDataType(a.dataType)) { + hasUnsupportedDataType = true + log.debug(s"Expression $a contains unsupported data type ${a.dataType}") + } else if (!projectIndexInChild.contains(index)) { + projectAttributes.append(a.toAttribute) + projectIndexInChild.append(index) + } + case p => getProjectIndexInChildOutput(p.children) + } + } + + override protected def doValidateInternal(): ValidationResult = { + if (!GlutenConfig.getConf.enableColumnarPartialProject) { + return ValidationResult.failed("Config disable this feature") + } + if (UDFAttrNotExists) { + return ValidationResult.failed("Attribute in the UDF does not exists in its child") + } + if (hasUnsupportedDataType) { + return ValidationResult.failed("Attribute in the UDF contains unsupported type") + } + if (projectAttributes.size == child.output.size) { + return ValidationResult.failed("UDF need all the columns in child output") + } + if (original.output.isEmpty) { + return ValidationResult.failed("Project fallback because output is empty") + } + if (replacedAliasUdf.isEmpty) { + return ValidationResult.failed("No UDF") + } + if (replacedAliasUdf.size > original.output.size) { + // e.g. udf1(col) + udf2(col), it will introduce 2 cols for r2c + return ValidationResult.failed("Number of RowToColumn columns is more than ProjectExec") + } + if (!original.projectList.forall(validateExpression(_))) { + return ValidationResult.failed("Contains expression not supported") + } + if (isComplexExpression()) { + return ValidationResult.failed("Fallback by complex expression") + } + ValidationResult.succeeded + } + + private def isComplexExpression(): Boolean = { + new ExpressionUtils.FallbackComplexExpressions( + GlutenConfig.getConf.fallbackExpressionsThreshold) + .validate(original) match { + case Passed => false + case _ => true + } + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val totalTime = longMetric("time") + val c2r = longMetric("column_to_row_time") + val r2c = longMetric("row_to_column_time") + val isMutable = canUseMutableProjection() + child.executeColumnar().mapPartitions { + batches => + val res: Iterator[Iterator[ColumnarBatch]] = new Iterator[Iterator[ColumnarBatch]] { + override def hasNext: Boolean = batches.hasNext + + override def next(): Iterator[ColumnarBatch] = { + val batch = batches.next() + if (batch.numRows == 0) { + Iterator.empty + } else { + val start = System.currentTimeMillis() + val childData = ColumnarBatches.select(batch, projectIndexInChild.toArray) + val projectedBatch = if (isMutable) { + getProjectedBatchArrow(childData, c2r, r2c) + } else getProjectedBatch(childData, c2r, r2c) + val batchIterator = projectedBatch.map { + b => + if (b.numCols() != 0) { + val compositeBatch = VeloxColumnarBatches.compose(batch, b) + b.close() + compositeBatch + } else { + b.close() + ColumnarBatches.retain(batch) + batch + } + } + childData.close() + totalTime += System.currentTimeMillis() - start + batchIterator + } + } + } + Iterators + .wrap(res.flatten) + .protectInvocationFlow() // Spark may call `hasNext()` again after a false output which + // is not allowed by Gluten iterators. E.g. GroupedIterator#fetchNextGroupIterator + .recyclePayload(_.close()) + .create() + + } + } + + // scalastyle:off line.size.limit + // String type cannot use MutableProjection + // Otherwise will throw java.lang.UnsupportedOperationException: Datatype not supported StringType + // at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.update(MutableColumnarRow.java:224) + // at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) + // scalastyle:on line.size.limit + private def canUseMutableProjection(): Boolean = { + replacedAliasUdf.forall( + r => + r.dataType match { + case StringType | BinaryType => false + case _ => true + }) + } + + /** + * add c2r and r2c for unsupported expression child data c2r get Iterator[InternalRow], then call + * Spark project, then r2c + */ + private def getProjectedBatch( + childData: ColumnarBatch, + c2r: SQLMetric, + r2c: SQLMetric): Iterator[ColumnarBatch] = { + // select part of child output and child data + val proj = UnsafeProjection.create(replacedAliasUdf, projectAttributes) + val numOutputRows = new SQLMetric("numOutputRows") + val numInputBatches = new SQLMetric("numInputBatches") + val rows = VeloxColumnarToRowExec + .toRowIterator( + Iterator.single[ColumnarBatch](childData), + projectAttributes, + numOutputRows, + numInputBatches, + c2r) + .map(proj) + + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute)) + RowToVeloxColumnarExec.toColumnarBatchIterator( + rows, + schema, + numOutputRows, + numInputBatches, + r2c, + childData.numRows()) + // TODO: should check the size <= 1, but now it has bug, will change iterator to empty + } + + private def getProjectedBatchArrow( + childData: ColumnarBatch, + c2a: SQLMetric, + a2c: SQLMetric): Iterator[ColumnarBatch] = { + // select part of child output and child data + val proj = MutableProjection.create(replacedAliasUdf, projectAttributes) + val numRows = childData.numRows() + val start = System.currentTimeMillis() + val arrowBatch = if (childData.numCols() == 0) { childData } + else ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), childData) + c2a += System.currentTimeMillis() - start + + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute)) + val vectors: Array[WritableColumnVector] = ArrowWritableColumnVector + .allocateColumns(numRows, schema) + .map { + vector => + vector.setValueCount(numRows) + vector + } + val targetRow = new MutableColumnarRow(vectors) + for (i <- 0 until numRows) { + targetRow.rowId = i + proj.target(targetRow).apply(arrowBatch.getRow(i)) + } + val targetBatch = new ColumnarBatch(vectors.map(_.asInstanceOf[ColumnVector]), numRows) + val start2 = System.currentTimeMillis() + val veloxBatch = + ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), targetBatch) + a2c += System.currentTimeMillis() - start2 + Iterators + .wrap(Iterator.single(veloxBatch)) + .recycleIterator({ + arrowBatch.close() + targetBatch.close() + }) + .create() + // TODO: should check the size <= 1, but now it has bug, will change iterator to empty + } + + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Output", output)} + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("ScalaUDF", replacedAliasUdf)} Review Comment: nit: ScalaUDF -> UDF -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
