This is an automated email from the ASF dual-hosted git repository.
mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f96105de85 [CORE] Simplify code of offload scan (#8144)
f96105de85 is described below
commit f96105de853ad5855b59953f4932c38b2860b05c
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Dec 6 09:14:35 2024 +0800
[CORE] Simplify code of offload scan (#8144)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 2 +
.../clickhouse/CHSparkPlanExecApi.scala | 13 ++----
.../gluten/backendsapi/BackendSettingsApi.scala | 2 +
.../gluten/backendsapi/SparkPlanExecApi.scala | 5 +--
.../execution/BatchScanExecTransformer.scala | 4 ++
.../CartesianProductExecTransformer.scala | 8 ++++
.../gluten/execution/ScanTransformerFactory.scala | 52 ++++------------------
.../columnar/offload/OffloadSingleNodeRules.scala | 50 ++++-----------------
.../extension/columnar/validator/Validators.scala | 31 +++----------
.../sql/hive/HiveTableScanExecTransformer.scala | 23 +---------
10 files changed, 47 insertions(+), 143 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index e5eb91b69b..823ed74700 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -364,6 +364,8 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
override def supportCartesianProductExec(): Boolean = true
+ override def supportCartesianProductExecWithCondition(): Boolean = false
+
override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
t =>
if (super.supportHashBuildJoinTypeOnLeft(t)) {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index dde03d4ad0..c2f91fa152 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -363,15 +363,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
left: SparkPlan,
right: SparkPlan,
condition: Option[Expression]): CartesianProductExecTransformer =
- if (!condition.isEmpty) {
- throw new GlutenNotSupportException(
- "CartesianProductExecTransformer with condition is not supported in ch
backend.")
- } else {
- CartesianProductExecTransformer(
- ColumnarCartesianProductBridge(left),
- ColumnarCartesianProductBridge(right),
- condition)
- }
+ CartesianProductExecTransformer(
+ ColumnarCartesianProductBridge(left),
+ ColumnarCartesianProductBridge(right),
+ condition)
override def genBroadcastNestedLoopJoinExecTransformer(
left: SparkPlan,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 1eb69da6e5..0d5b6b36da 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -121,6 +121,8 @@ trait BackendSettingsApi {
def supportCartesianProductExec(): Boolean = false
+ def supportCartesianProductExecWithCondition(): Boolean = true
+
def supportBroadcastNestedLoopJoinExec(): Boolean = true
def supportSampleExec(): Boolean = false
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 667c0bdc25..ec032af92d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
import org.apache.spark.sql.execution.window._
-import org.apache.spark.sql.hive.{HiveTableScanExecTransformer,
HiveUDFTransformer}
+import org.apache.spark.sql.hive.HiveUDFTransformer
import org.apache.spark.sql.types.{DecimalType, LongType, NullType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -64,9 +64,6 @@ trait SparkPlanExecApi {
*/
def genFilterExecTransformer(condition: Expression, child: SparkPlan):
FilterExecTransformerBase
- def genHiveTableScanExecTransformer(plan: SparkPlan):
HiveTableScanExecTransformer =
- HiveTableScanExecTransformer(plan)
-
def genProjectExecTransformer(
projectList: Seq[NamedExpression],
child: SparkPlan): ProjectExecTransformer =
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index e1a1be8e29..4f603e1024 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -134,6 +134,10 @@ abstract class BatchScanExecTransformerBase(
}
override def doValidateInternal(): ValidationResult = {
+ if (!ScanTransformerFactory.supportedBatchScan(scan)) {
+ return ValidationResult.failed(s"Unsupported scan $scan")
+ }
+
if (pushedAggregate.nonEmpty) {
return ValidationResult.failed(s"Unsupported aggregation push down for
$scan.")
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 3e3169aa55..9e2f12bcf8 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -111,6 +111,14 @@ case class CartesianProductExecTransformer(
}
override protected def doValidateInternal(): ValidationResult = {
+ if (
+
!BackendsApiManager.getSettings.supportCartesianProductExecWithCondition() &&
+ condition.nonEmpty
+ ) {
+ return ValidationResult.failed(
+ "CartesianProductExecTransformer with condition is not supported in
this backend.")
+ }
+
if (!BackendsApiManager.getSettings.supportCartesianProductExec()) {
return ValidationResult.failed("Cartesian product is not supported in
this backend")
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 52dad6da37..dfdf2d2f34 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -16,12 +16,10 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import java.util.ServiceLoader
@@ -58,8 +56,7 @@ object ScanTransformerFactory {
}
}
- private def lookupBatchScanTransformer(
- batchScanExec: BatchScanExec): BatchScanExecTransformerBase = {
+ def createBatchScanTransformer(batchScanExec: BatchScanExec):
BatchScanExecTransformerBase = {
val scan = batchScanExec.scan
lookupDataSourceScanTransformer(scan.getClass.getName) match {
case Some(clz) =>
@@ -69,46 +66,16 @@ object ScanTransformerFactory {
.asInstanceOf[DataSourceScanTransformerRegister]
.createDataSourceV2Transformer(batchScanExec)
case _ =>
- scan match {
- case _: FileScan =>
- BatchScanExecTransformer(
- batchScanExec.output,
- batchScanExec.scan,
- batchScanExec.runtimeFilters,
- table =
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
- )
- case _ =>
- throw new GlutenNotSupportException(s"Unsupported scan $scan")
- }
- }
- }
-
- def createBatchScanTransformer(
- batchScan: BatchScanExec,
- validation: Boolean = false): SparkPlan = {
- if (supportedBatchScan(batchScan.scan)) {
- val transformer = lookupBatchScanTransformer(batchScan)
- if (!validation) {
- val validationResult = transformer.doValidate()
- if (validationResult.ok()) {
- transformer
- } else {
- FallbackTags.add(batchScan, validationResult.reason())
- batchScan
- }
- } else {
- transformer
- }
- } else {
- if (validation) {
- throw new GlutenNotSupportException(s"Unsupported scan
${batchScan.scan}")
- }
- FallbackTags.add(batchScan, "The scan in BatchScanExec is not
supported.")
- batchScan
+ BatchScanExecTransformer(
+ batchScanExec.output,
+ batchScanExec.scan,
+ batchScanExec.runtimeFilters,
+ table =
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
+ )
}
}
- private def supportedBatchScan(scan: Scan): Boolean = scan match {
+ def supportedBatchScan(scan: Scan): Boolean = scan match {
case _: FileScan => true
case _ => lookupDataSourceScanTransformer(scan.getClass.getName).nonEmpty
}
@@ -132,5 +99,4 @@ object ScanTransformerFactory {
)
Option(clz)
}
-
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index fa698cd244..7dc40faa43 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension.columnar.offload
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.logging.LogLevelUtil
@@ -178,7 +177,7 @@ object OffloadJoin {
// Other transformations.
case class OffloadOthers() extends OffloadSingleNode with LogLevelUtil {
import OffloadOthers._
- private val replace = new ReplaceSingleNode()
+ private val replace = new ReplaceSingleNode
override def offload(plan: SparkPlan): SparkPlan = replace.doReplace(plan)
}
@@ -190,7 +189,7 @@ object OffloadOthers {
// Do not look up on children on the input node in this rule. Otherwise
// it may break RAS which would group all the possible input nodes to
// search for validate candidates.
- private class ReplaceSingleNode() extends LogLevelUtil with Logging {
+ private class ReplaceSingleNode extends LogLevelUtil with Logging {
def doReplace(p: SparkPlan): SparkPlan = {
val plan = p
@@ -199,11 +198,15 @@ object OffloadOthers {
}
plan match {
case plan: BatchScanExec =>
- applyScanTransformer(plan)
+ logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ ScanTransformerFactory.createBatchScanTransformer(plan)
case plan: FileSourceScanExec =>
- applyScanTransformer(plan)
+ logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ ScanTransformerFactory.createFileSourceScanTransformer(plan)
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
- applyScanTransformer(plan)
+ // TODO: Add DynamicPartitionPruningHiveScanSuite.scala
+ logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ HiveTableScanExecTransformer(plan)
case plan: CoalesceExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
ColumnarCoalesceExec(plan.numPartitions, plan.child)
@@ -333,40 +336,5 @@ object OffloadOthers {
case other => other
}
}
-
- /**
- * Apply scan transformer for file source and batch source,
- * 1. create new filter and scan transformer, 2. validate, tag new scan
as unsupported if
- * failed, 3. return new source.
- */
- private def applyScanTransformer(plan: SparkPlan): SparkPlan = plan match {
- case plan: FileSourceScanExec =>
- val transformer =
ScanTransformerFactory.createFileSourceScanTransformer(plan)
- val validationResult = transformer.doValidate()
- if (validationResult.ok()) {
- logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- transformer
- } else {
- logDebug(s"Columnar Processing for ${plan.getClass} is currently
unsupported.")
- FallbackTags.add(plan, validationResult.reason())
- plan
- }
- case plan: BatchScanExec =>
- ScanTransformerFactory.createBatchScanTransformer(plan)
- case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
- // TODO: Add DynamicPartitionPruningHiveScanSuite.scala
- val hiveTableScanExecTransformer =
-
BackendsApiManager.getSparkPlanExecApiInstance.genHiveTableScanExecTransformer(plan)
- val validateResult = hiveTableScanExecTransformer.doValidate()
- if (validateResult.ok()) {
- logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- return hiveTableScanExecTransformer
- }
- logDebug(s"Columnar Processing for ${plan.getClass} is currently
unsupported.")
- FallbackTags.add(plan, validateResult.reason())
- plan
- case other =>
- throw new GlutenNotSupportException(s"${other.getClass.toString} is
not supported.")
- }
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 7e7d732c29..d246167bd7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension.columnar.validator
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi}
-import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression.ExpressionUtils
import org.apache.gluten.extension.columnar.FallbackTags
@@ -95,7 +94,7 @@ object Validators {
* native validation failed.
*/
def fallbackByNativeValidation(): Validator.Builder = {
- builder.add(new FallbackByNativeValidation())
+ builder.add(new FallbackByNativeValidation)
}
}
@@ -223,34 +222,16 @@ object Validators {
}
}
- private class FallbackByNativeValidation() extends Validator with Logging {
- override def validate(plan: SparkPlan): Validator.OutCome = {
- try {
- validate0(plan)
- } catch {
- case e @ (_: GlutenNotSupportException | _:
UnsupportedOperationException) =>
- if (!e.isInstanceOf[GlutenNotSupportException]) {
- logDebug("Just a warning. This exception perhaps needs to be
fixed.", e)
- }
- fail(
- s"${e.getMessage}, original Spark plan is " +
- s"${plan.getClass}(${plan.children.toList.map(_.getClass)})")
- }
- }
-
- private def validate0(plan: SparkPlan): Validator.OutCome = plan match {
+ private class FallbackByNativeValidation extends Validator with Logging {
+ override def validate(plan: SparkPlan): Validator.OutCome = plan match {
case plan: BatchScanExec =>
- val transformer =
- ScanTransformerFactory
- .createBatchScanTransformer(plan, validation = true)
- .asInstanceOf[BasicScanExecTransformer]
+ val transformer =
ScanTransformerFactory.createBatchScanTransformer(plan)
transformer.doValidate().toValidatorOutcome()
case plan: FileSourceScanExec =>
- val transformer =
- ScanTransformerFactory.createFileSourceScanTransformer(plan)
+ val transformer =
ScanTransformerFactory.createFileSourceScanTransformer(plan)
transformer.doValidate().toValidatorOutcome()
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
- HiveTableScanExecTransformer.validate(plan).toValidatorOutcome()
+ HiveTableScanExecTransformer(plan).doValidate().toValidatorOutcome()
case plan: ProjectExec =>
val transformer = ProjectExecTransformer(plan.projectList, plan.child)
transformer.doValidate().toValidatorOutcome()
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index f701c76b18..6911672376 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -18,7 +18,6 @@ package org.apache.spark.sql.hive
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -181,8 +180,8 @@ case class HiveTableScanExecTransformer(
object HiveTableScanExecTransformer {
- val NULL_VALUE: Char = 0x00
- val DEFAULT_FIELD_DELIMITER: Char = 0x01
+ private val NULL_VALUE: Char = 0x00
+ private val DEFAULT_FIELD_DELIMITER: Char = 0x01
val TEXT_INPUT_FORMAT_CLASS: Class[TextInputFormat] =
Utils.classForName("org.apache.hadoop.mapred.TextInputFormat")
val ORC_INPUT_FORMAT_CLASS: Class[OrcInputFormat] =
@@ -193,24 +192,6 @@ object HiveTableScanExecTransformer {
plan.isInstanceOf[HiveTableScanExec]
}
- def copyWith(plan: SparkPlan, newPartitionFilters: Seq[Expression]):
SparkPlan = {
- val hiveTableScanExec = plan.asInstanceOf[HiveTableScanExec]
- hiveTableScanExec.copy(partitionPruningPred =
newPartitionFilters)(sparkSession =
- hiveTableScanExec.session)
- }
-
- def validate(plan: SparkPlan): ValidationResult = {
- plan match {
- case hiveTableScan: HiveTableScanExec =>
- val hiveTableScanTransformer = new HiveTableScanExecTransformer(
- hiveTableScan.requestedAttributes,
- hiveTableScan.relation,
- hiveTableScan.partitionPruningPred)(hiveTableScan.session)
- hiveTableScanTransformer.doValidate()
- case _ => ValidationResult.failed("Is not a Hive scan")
- }
- }
-
def apply(plan: SparkPlan): HiveTableScanExecTransformer = {
plan match {
case hiveTableScan: HiveTableScanExec =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]