This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 67ba92b90a [CORE][CH] Remove API
BackendSettingsApi#supportShuffleWithProject (#8009)
67ba92b90a is described below
commit 67ba92b90a097eb369819030581d4157e7084e0b
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Nov 21 11:02:00 2024 +0800
[CORE][CH] Remove API BackendSettingsApi#supportShuffleWithProject (#8009)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 23 -------------
.../clickhouse/CHSparkPlanExecApi.scala | 4 +--
.../backendsapi/clickhouse/CHValidatorApi.scala | 39 ++++++++++++++++++----
.../utils/RangePartitionerBoundsGenerator.scala | 7 ++--
.../gluten/backendsapi/BackendSettingsApi.scala | 8 -----
5 files changed, 38 insertions(+), 43 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 98d1eabe57..061ec9856e 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -34,9 +34,6 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning}
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -255,26 +252,6 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
}
}
- override def supportShuffleWithProject(
- outputPartitioning: Partitioning,
- child: SparkPlan): Boolean = {
- child match {
- case hash: HashAggregateExec =>
- if (hash.aggregateExpressions.isEmpty) {
- true
- } else {
- outputPartitioning match {
- case hashPartitioning: HashPartitioning =>
- hashPartitioning.expressions.exists(x =>
!x.isInstanceOf[AttributeReference])
- case _ =>
- false
- }
- }
- case _ =>
- true
- }
- }
-
override def supportSortExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSort
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index e413cb8ed4..dde03d4ad0 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -273,9 +273,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec):
SparkPlan = {
val child = shuffle.child
- if (
-
BackendsApiManager.getSettings.supportShuffleWithProject(shuffle.outputPartitioning,
child)
- ) {
+ if (CHValidatorApi.supportShuffleWithProject(shuffle.outputPartitioning,
child)) {
val (projectColumnNumber, newPartitioning, newChild) =
addProjectionForShuffleExchange(shuffle)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
index eed493cffe..29d26410b9 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.{BackendsApiManager, ValidatorApi}
+import org.apache.gluten.backendsapi.ValidatorApi
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.SubstraitContext
@@ -28,12 +28,14 @@ import
org.apache.gluten.vectorized.CHNativeExpressionEvaluator
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, RangePartitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.aggregate.HashAggregateExec
class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with
Logging {
+ import CHValidatorApi._
override def doNativeValidateWithFailureReason(plan: PlanNode):
ValidationResult = {
if (CHNativeExpressionEvaluator.doValidate(plan.toProtobuf.toByteArray)) {
@@ -86,10 +88,7 @@ class CHValidatorApi extends ValidatorApi with
AdaptiveSparkPlanHelper with Logg
.doTransform(substraitContext.registeredFunction)
node.isInstanceOf[SelectionNode]
}
- if (
- allSelectionNodes ||
-
BackendsApiManager.getSettings.supportShuffleWithProject(outputPartitioning,
child)
- ) {
+ if (allSelectionNodes || supportShuffleWithProject(outputPartitioning,
child)) {
None
} else {
Some("expressions are not supported in HashPartitioning")
@@ -107,3 +106,31 @@ class CHValidatorApi extends ValidatorApi with
AdaptiveSparkPlanHelper with Logg
}
}
}
+
+object CHValidatorApi {
+
+ /**
+ * A shuffle key may be an expression. We would add a projection for this
expression shuffle key
+ * and make it into a new column which the shuffle will refer to. But we
need to remove it from
+ * the result columns from the shuffle.
+ *
+ * Since https://github.com/apache/incubator-gluten/pull/1071.
+ */
+ def supportShuffleWithProject(outputPartitioning: Partitioning, child:
SparkPlan): Boolean = {
+ child match {
+ case hash: HashAggregateExec =>
+ if (hash.aggregateExpressions.isEmpty) {
+ true
+ } else {
+ outputPartitioning match {
+ case hashPartitioning: HashPartitioning =>
+ hashPartitioning.expressions.exists(x =>
!x.isInstanceOf[AttributeReference])
+ case _ =>
+ false
+ }
+ }
+ case _ =>
+ true
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
index 87c6ae343d..706cc5f341 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.shuffle.utils
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backendsapi.clickhouse.CHValidatorApi
import org.apache.gluten.execution.SortExecTransformer
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.substrait.SubstraitContext
@@ -261,8 +261,9 @@ object RangePartitionerBoundsGenerator {
break
}
if (
- !ordering.child.isInstanceOf[Attribute] &&
!BackendsApiManager.getSettings
- .supportShuffleWithProject(rangePartitioning, child)
+ !ordering.child.isInstanceOf[Attribute] &&
!CHValidatorApi.supportShuffleWithProject(
+ rangePartitioning,
+ child)
) {
enableRangePartitioning = false
break
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 177d19c0c7..506424b79c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -23,8 +23,6 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.SparkPlan
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
@@ -76,12 +74,6 @@ trait BackendSettingsApi {
def recreateJoinExecOnFallback(): Boolean = false
- /**
- * A shuffle key may be an expression. We would add a projection for this
expression shuffle key
- * and make it into a new column which the shuffle will refer to. But we
need to remove it from
- * the result columns from the shuffle.
- */
- def supportShuffleWithProject(outputPartitioning: Partitioning, child:
SparkPlan): Boolean = false
def excludeScanExecFromCollapsedStage(): Boolean = false
def rescaleDecimalArithmetic: Boolean = false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]