This is an automated email from the ASF dual-hosted git repository.

mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f96105de85 [CORE] Simplify code of offload scan (#8144)
f96105de85 is described below

commit f96105de853ad5855b59953f4932c38b2860b05c
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Dec 6 09:14:35 2024 +0800

    [CORE] Simplify code of offload scan (#8144)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  2 +
 .../clickhouse/CHSparkPlanExecApi.scala            | 13 ++----
 .../gluten/backendsapi/BackendSettingsApi.scala    |  2 +
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  5 +--
 .../execution/BatchScanExecTransformer.scala       |  4 ++
 .../CartesianProductExecTransformer.scala          |  8 ++++
 .../gluten/execution/ScanTransformerFactory.scala  | 52 ++++------------------
 .../columnar/offload/OffloadSingleNodeRules.scala  | 50 ++++-----------------
 .../extension/columnar/validator/Validators.scala  | 31 +++----------
 .../sql/hive/HiveTableScanExecTransformer.scala    | 23 +---------
 10 files changed, 47 insertions(+), 143 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index e5eb91b69b..823ed74700 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -364,6 +364,8 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
 
   override def supportCartesianProductExec(): Boolean = true
 
+  override def supportCartesianProductExecWithCondition(): Boolean = false
+
   override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
     t =>
       if (super.supportHashBuildJoinTypeOnLeft(t)) {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index dde03d4ad0..c2f91fa152 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -363,15 +363,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       left: SparkPlan,
       right: SparkPlan,
       condition: Option[Expression]): CartesianProductExecTransformer =
-    if (!condition.isEmpty) {
-      throw new GlutenNotSupportException(
-        "CartesianProductExecTransformer with condition is not supported in ch 
backend.")
-    } else {
-      CartesianProductExecTransformer(
-        ColumnarCartesianProductBridge(left),
-        ColumnarCartesianProductBridge(right),
-        condition)
-    }
+    CartesianProductExecTransformer(
+      ColumnarCartesianProductBridge(left),
+      ColumnarCartesianProductBridge(right),
+      condition)
 
   override def genBroadcastNestedLoopJoinExecTransformer(
       left: SparkPlan,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 1eb69da6e5..0d5b6b36da 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -121,6 +121,8 @@ trait BackendSettingsApi {
 
   def supportCartesianProductExec(): Boolean = false
 
+  def supportCartesianProductExecWithCondition(): Boolean = true
+
   def supportBroadcastNestedLoopJoinExec(): Boolean = true
 
   def supportSampleExec(): Boolean = false
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 667c0bdc25..ec032af92d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.joins.BuildSideRelation
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
 import org.apache.spark.sql.execution.window._
-import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, 
HiveUDFTransformer}
+import org.apache.spark.sql.hive.HiveUDFTransformer
 import org.apache.spark.sql.types.{DecimalType, LongType, NullType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
@@ -64,9 +64,6 @@ trait SparkPlanExecApi {
    */
   def genFilterExecTransformer(condition: Expression, child: SparkPlan): 
FilterExecTransformerBase
 
-  def genHiveTableScanExecTransformer(plan: SparkPlan): 
HiveTableScanExecTransformer =
-    HiveTableScanExecTransformer(plan)
-
   def genProjectExecTransformer(
       projectList: Seq[NamedExpression],
       child: SparkPlan): ProjectExecTransformer =
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index e1a1be8e29..4f603e1024 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -134,6 +134,10 @@ abstract class BatchScanExecTransformerBase(
   }
 
   override def doValidateInternal(): ValidationResult = {
+    if (!ScanTransformerFactory.supportedBatchScan(scan)) {
+      return ValidationResult.failed(s"Unsupported scan $scan")
+    }
+
     if (pushedAggregate.nonEmpty) {
       return ValidationResult.failed(s"Unsupported aggregation push down for 
$scan.")
     }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 3e3169aa55..9e2f12bcf8 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -111,6 +111,14 @@ case class CartesianProductExecTransformer(
   }
 
   override protected def doValidateInternal(): ValidationResult = {
+    if (
+      
!BackendsApiManager.getSettings.supportCartesianProductExecWithCondition() &&
+      condition.nonEmpty
+    ) {
+      return ValidationResult.failed(
+        "CartesianProductExecTransformer with condition is not supported in 
this backend.")
+    }
+
     if (!BackendsApiManager.getSettings.supportCartesianProductExec()) {
       return ValidationResult.failed("Cartesian product is not supported in 
this backend")
     }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 52dad6da37..dfdf2d2f34 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -16,12 +16,10 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 
 import java.util.ServiceLoader
@@ -58,8 +56,7 @@ object ScanTransformerFactory {
     }
   }
 
-  private def lookupBatchScanTransformer(
-      batchScanExec: BatchScanExec): BatchScanExecTransformerBase = {
+  def createBatchScanTransformer(batchScanExec: BatchScanExec): 
BatchScanExecTransformerBase = {
     val scan = batchScanExec.scan
     lookupDataSourceScanTransformer(scan.getClass.getName) match {
       case Some(clz) =>
@@ -69,46 +66,16 @@ object ScanTransformerFactory {
           .asInstanceOf[DataSourceScanTransformerRegister]
           .createDataSourceV2Transformer(batchScanExec)
       case _ =>
-        scan match {
-          case _: FileScan =>
-            BatchScanExecTransformer(
-              batchScanExec.output,
-              batchScanExec.scan,
-              batchScanExec.runtimeFilters,
-              table = 
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
-            )
-          case _ =>
-            throw new GlutenNotSupportException(s"Unsupported scan $scan")
-        }
-    }
-  }
-
-  def createBatchScanTransformer(
-      batchScan: BatchScanExec,
-      validation: Boolean = false): SparkPlan = {
-    if (supportedBatchScan(batchScan.scan)) {
-      val transformer = lookupBatchScanTransformer(batchScan)
-      if (!validation) {
-        val validationResult = transformer.doValidate()
-        if (validationResult.ok()) {
-          transformer
-        } else {
-          FallbackTags.add(batchScan, validationResult.reason())
-          batchScan
-        }
-      } else {
-        transformer
-      }
-    } else {
-      if (validation) {
-        throw new GlutenNotSupportException(s"Unsupported scan 
${batchScan.scan}")
-      }
-      FallbackTags.add(batchScan, "The scan in BatchScanExec is not 
supported.")
-      batchScan
+        BatchScanExecTransformer(
+          batchScanExec.output,
+          batchScanExec.scan,
+          batchScanExec.runtimeFilters,
+          table = 
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
+        )
     }
   }
 
-  private def supportedBatchScan(scan: Scan): Boolean = scan match {
+  def supportedBatchScan(scan: Scan): Boolean = scan match {
     case _: FileScan => true
     case _ => lookupDataSourceScanTransformer(scan.getClass.getName).nonEmpty
   }
@@ -132,5 +99,4 @@ object ScanTransformerFactory {
     )
     Option(clz)
   }
-
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index fa698cd244..7dc40faa43 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension.columnar.offload
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.logging.LogLevelUtil
@@ -178,7 +177,7 @@ object OffloadJoin {
 // Other transformations.
 case class OffloadOthers() extends OffloadSingleNode with LogLevelUtil {
   import OffloadOthers._
-  private val replace = new ReplaceSingleNode()
+  private val replace = new ReplaceSingleNode
 
   override def offload(plan: SparkPlan): SparkPlan = replace.doReplace(plan)
 }
@@ -190,7 +189,7 @@ object OffloadOthers {
   // Do not look up on children on the input node in this rule. Otherwise
   // it may break RAS which would group all the possible input nodes to
   // search for validate candidates.
-  private class ReplaceSingleNode() extends LogLevelUtil with Logging {
+  private class ReplaceSingleNode extends LogLevelUtil with Logging {
 
     def doReplace(p: SparkPlan): SparkPlan = {
       val plan = p
@@ -199,11 +198,15 @@ object OffloadOthers {
       }
       plan match {
         case plan: BatchScanExec =>
-          applyScanTransformer(plan)
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          ScanTransformerFactory.createBatchScanTransformer(plan)
         case plan: FileSourceScanExec =>
-          applyScanTransformer(plan)
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          ScanTransformerFactory.createFileSourceScanTransformer(plan)
         case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
-          applyScanTransformer(plan)
+          // TODO: Add DynamicPartitionPruningHiveScanSuite.scala
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          HiveTableScanExecTransformer(plan)
         case plan: CoalesceExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           ColumnarCoalesceExec(plan.numPartitions, plan.child)
@@ -333,40 +336,5 @@ object OffloadOthers {
         case other => other
       }
     }
-
-    /**
-     * Apply scan transformer for file source and batch source,
-     *   1. create new filter and scan transformer, 2. validate, tag new scan 
as unsupported if
-     *      failed, 3. return new source.
-     */
-    private def applyScanTransformer(plan: SparkPlan): SparkPlan = plan match {
-      case plan: FileSourceScanExec =>
-        val transformer = 
ScanTransformerFactory.createFileSourceScanTransformer(plan)
-        val validationResult = transformer.doValidate()
-        if (validationResult.ok()) {
-          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          transformer
-        } else {
-          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
unsupported.")
-          FallbackTags.add(plan, validationResult.reason())
-          plan
-        }
-      case plan: BatchScanExec =>
-        ScanTransformerFactory.createBatchScanTransformer(plan)
-      case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
-        // TODO: Add DynamicPartitionPruningHiveScanSuite.scala
-        val hiveTableScanExecTransformer =
-          
BackendsApiManager.getSparkPlanExecApiInstance.genHiveTableScanExecTransformer(plan)
-        val validateResult = hiveTableScanExecTransformer.doValidate()
-        if (validateResult.ok()) {
-          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          return hiveTableScanExecTransformer
-        }
-        logDebug(s"Columnar Processing for ${plan.getClass} is currently 
unsupported.")
-        FallbackTags.add(plan, validateResult.reason())
-        plan
-      case other =>
-        throw new GlutenNotSupportException(s"${other.getClass.toString} is 
not supported.")
-    }
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 7e7d732c29..d246167bd7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension.columnar.validator
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi}
-import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.expression.ExpressionUtils
 import org.apache.gluten.extension.columnar.FallbackTags
@@ -95,7 +94,7 @@ object Validators {
      * native validation failed.
      */
     def fallbackByNativeValidation(): Validator.Builder = {
-      builder.add(new FallbackByNativeValidation())
+      builder.add(new FallbackByNativeValidation)
     }
   }
 
@@ -223,34 +222,16 @@ object Validators {
     }
   }
 
-  private class FallbackByNativeValidation() extends Validator with Logging {
-    override def validate(plan: SparkPlan): Validator.OutCome = {
-      try {
-        validate0(plan)
-      } catch {
-        case e @ (_: GlutenNotSupportException | _: 
UnsupportedOperationException) =>
-          if (!e.isInstanceOf[GlutenNotSupportException]) {
-            logDebug("Just a warning. This exception perhaps needs to be 
fixed.", e)
-          }
-          fail(
-            s"${e.getMessage}, original Spark plan is " +
-              s"${plan.getClass}(${plan.children.toList.map(_.getClass)})")
-      }
-    }
-
-    private def validate0(plan: SparkPlan): Validator.OutCome = plan match {
+  private class FallbackByNativeValidation extends Validator with Logging {
+    override def validate(plan: SparkPlan): Validator.OutCome = plan match {
       case plan: BatchScanExec =>
-        val transformer =
-          ScanTransformerFactory
-            .createBatchScanTransformer(plan, validation = true)
-            .asInstanceOf[BasicScanExecTransformer]
+        val transformer = 
ScanTransformerFactory.createBatchScanTransformer(plan)
         transformer.doValidate().toValidatorOutcome()
       case plan: FileSourceScanExec =>
-        val transformer =
-          ScanTransformerFactory.createFileSourceScanTransformer(plan)
+        val transformer = 
ScanTransformerFactory.createFileSourceScanTransformer(plan)
         transformer.doValidate().toValidatorOutcome()
       case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
-        HiveTableScanExecTransformer.validate(plan).toValidatorOutcome()
+        HiveTableScanExecTransformer(plan).doValidate().toValidatorOutcome()
       case plan: ProjectExec =>
         val transformer = ProjectExecTransformer(plan.projectList, plan.child)
         transformer.doValidate().toValidatorOutcome()
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index f701c76b18..6911672376 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -18,7 +18,6 @@ package org.apache.spark.sql.hive
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
@@ -181,8 +180,8 @@ case class HiveTableScanExecTransformer(
 
 object HiveTableScanExecTransformer {
 
-  val NULL_VALUE: Char = 0x00
-  val DEFAULT_FIELD_DELIMITER: Char = 0x01
+  private val NULL_VALUE: Char = 0x00
+  private val DEFAULT_FIELD_DELIMITER: Char = 0x01
   val TEXT_INPUT_FORMAT_CLASS: Class[TextInputFormat] =
     Utils.classForName("org.apache.hadoop.mapred.TextInputFormat")
   val ORC_INPUT_FORMAT_CLASS: Class[OrcInputFormat] =
@@ -193,24 +192,6 @@ object HiveTableScanExecTransformer {
     plan.isInstanceOf[HiveTableScanExec]
   }
 
-  def copyWith(plan: SparkPlan, newPartitionFilters: Seq[Expression]): 
SparkPlan = {
-    val hiveTableScanExec = plan.asInstanceOf[HiveTableScanExec]
-    hiveTableScanExec.copy(partitionPruningPred = 
newPartitionFilters)(sparkSession =
-      hiveTableScanExec.session)
-  }
-
-  def validate(plan: SparkPlan): ValidationResult = {
-    plan match {
-      case hiveTableScan: HiveTableScanExec =>
-        val hiveTableScanTransformer = new HiveTableScanExecTransformer(
-          hiveTableScan.requestedAttributes,
-          hiveTableScan.relation,
-          hiveTableScan.partitionPruningPred)(hiveTableScan.session)
-        hiveTableScanTransformer.doValidate()
-      case _ => ValidationResult.failed("Is not a Hive scan")
-    }
-  }
-
   def apply(plan: SparkPlan): HiveTableScanExecTransformer = {
     plan match {
       case hiveTableScan: HiveTableScanExec =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to