PHILO-HE commented on code in PR #5514: URL: https://github.com/apache/incubator-gluten/pull/5514#discussion_r1577418864
########## gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension.columnar.validator + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi} +import org.apache.gluten.expression.ExpressionUtils +import org.apache.gluten.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.datasources.WriteFilesExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.execution.window.WindowExec +import org.apache.spark.sql.hive.HiveTableScanExecTransformer + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +object Validators { + def builder(): Builder = Builder() + + class Builder private { + private val conf = GlutenConfig.getConf + private val settings = BackendsApiManager.getSettings + private val buffer: ListBuffer[Validator] = mutable.ListBuffer() + + /** Fails validation if a plan node was already tagged with TRANSFORM_UNSUPPORTED. */ + def fallbackByHint(): Builder = { + buffer += FallbackByHint + this + } + + /** + * Fails validation if a plan node includes an expression that is considered too complex to + * executed by native library. By default, we use a threshold option in config to make the + * decision. + */ + def fallbackComplexExpressions(): Builder = { + buffer += new FallbackComplexExpressions(conf.fallbackExpressionsThreshold) + this + } + + /** Fails validation on non-scan plan nodes if Gluten is running as scan-only mode. */ + def fallbackIfScanOnly(): Builder = { + buffer += new FallbackIfScanOnly(conf.enableScanOnly) + this + } + + /** + * Fails validation if native-execution of a plan node is not supported by current backend + * implementation by checking the active BackendSettings. + */ + def fallbackByBackendSettings(): Builder = { + buffer += new FallbackByBackendSettings(settings) + this + } + + /** + * Fails validation if native-execution of a plan node is disabled by Gluten/Spark + * configuration. + */ + def fallbackByUserOptions(): Builder = { + buffer += new FallbackByUserOptions(conf) + this + } + + /** Add a custom validator to pipeline. */ + def add(validator: Validator): Builder = { + buffer += validator + this + } + + def build(): Validator = { + if (buffer.isEmpty) { + NoopValidator + } else { + new ValidatorPipeline(buffer) + } + } + } + + private object Builder { + def apply(): Builder = new Builder() + } + + private object FallbackByHint extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = { + if (TransformHints.isNotTransformable(plan)) { + val hint = TransformHints.getHint(plan).asInstanceOf[TRANSFORM_UNSUPPORTED] + return fail(hint.reason.getOrElse("Reason not recorded")) + } + pass() + } + } + + private class FallbackComplexExpressions(threshold: Int) extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = { + if (plan.expressions.exists(e => ExpressionUtils.getExpressionTreeDepth(e) > threshold)) { + return fail( + s"Disabled because at least one present expression exceeded depth threshold: " + + s"${plan.nodeName}") + } + pass() + } + } + + private class FallbackIfScanOnly(scanOnly: Boolean) extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = plan match { + case _: BatchScanExec => pass() + case _: FileSourceScanExec => pass() + case p if HiveTableScanExecTransformer.isHiveTableScan(p) => pass() + case p if scanOnly => fail(p) + case _ => pass() + } + } + + private class FallbackByBackendSettings(settings: BackendSettingsApi) extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = plan match { + case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() => fail(p) + case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => fail(p) + case p: WriteFilesExec + if !(settings.enableNativeWriteFiles() && settings.supportTransformWriteFiles) => + fail(p) + case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg => Review Comment: Looks no need to keep `replaceSortAggWithHashAgg` in backend api now. Maybe, move the check (depends on config) for `SortAggregateExec` to `FallbackByUserOptions`? If it makes sense, it's ok to do the refactor in follow-up pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
