jinchengchenghh commented on code in PR #7360:
URL: https://github.com/apache/incubator-gluten/pull/7360#discussion_r1801085082


##########
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.expression.ExpressionUtils
+import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.columnar.validator.Validator.Passed
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, 
MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection}
+import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, 
WritableColumnVector}
+import org.apache.spark.sql.hive.HiveUdfUtil
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
NullType, ShortType, StringType, TimestampType, YearMonthIntervalType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * By rule <PartialProhectRule>, the project not offoload-able that is changed 
to
+ * ProjectExecTransformer + ColumnarPartialProjectExec e.g. sum(myudf(a) + b + 
hash(c)), child is
+ * (a, b, c) ColumnarPartialProjectExec (a, b, c, myudf(a)), 
ProjectExecTransformer(myudf(a) + b +
+ * hash(c))
+ *
+ * @param original
+ *   extract the ScalaUDF from original project list as Alias in 
UnsafeProjection and
+ *   AttributeReference in ColumnarPartialProjectExec output
+ * @param child
+ *   child plan
+ */
+case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
+    replacedAliasUdf: ListBuffer[Alias])
+  extends UnaryExecNode
+  with GlutenPlan {
+
+  private val debug = GlutenConfig.getConf.debug
+
+  private val projectAttributes: ListBuffer[Attribute] = ListBuffer()
+  private val projectIndexInChild: ListBuffer[Int] = ListBuffer()
+  private var UDFAttrNotExists = false
+  private var hasUnsupportedDataType = replacedAliasUdf.exists(a => 
!validateDataType(a.dataType))
+  if (!hasUnsupportedDataType) {
+    getProjectIndexInChildOutput(replacedAliasUdf)
+  }
+
+  @transient override lazy val metrics = Map(
+    "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of 
partial project"),
+    "column_to_row_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of velox to Arrow ColumnarBatch or UnsafeRow"),
+    "row_to_column_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of Arrow ColumnarBatch or UnsafeRow to velox")
+  )
+
+  override def output: Seq[Attribute] = child.output ++ 
replacedAliasUdf.map(_.toAttribute)
+
+  final override def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException(
+      s"${this.getClass.getSimpleName} doesn't support doExecute")
+  }
+
+  final override protected def otherCopyArgs: Seq[AnyRef] = {
+    replacedAliasUdf :: Nil
+  }
+
+  final override val supportsColumnar: Boolean = true
+
+  private def validateExpression(expr: Expression): Boolean = {
+    expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children
+      .forall(validateExpression)
+  }
+
+  private def validateDataType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: BooleanType => true
+      case _: ByteType => true
+      case _: ShortType => true
+      case _: IntegerType => true
+      case _: LongType => true
+      case _: FloatType => true
+      case _: DoubleType => true
+      case _: StringType => true
+      case _: TimestampType => true
+      case _: DateType => true
+      case _: BinaryType => true
+      case _: DecimalType => true
+      case YearMonthIntervalType.DEFAULT => true
+      case _: NullType => true
+      case _ => false

Review Comment:
   No, `VeloxColumnarBatch::select` input argument is the column index, it 
supports part of complex data type, supports udf(strcut), but if the udf is 
udf(struct.int), we cannot select the column. In other words, we cannot resolve 
the table schema
   ```
   struct
      int
      bitint
   int
   map
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to