This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d526528fa2 [VL] Add RewriteCreateTableAsSelect for spark34 (#10646)
d526528fa2 is described below
commit d526528fa273802f5d906f53a3511ed9a42af973
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Sep 15 16:34:33 2025 +0800
[VL] Add RewriteCreateTableAsSelect for spark34 (#10646)
Spark 3.5 already supports columnar v2 write for CTAS after this PR was
introduced [SPARK-43088][SQL] Respect RequiresDistributionAndOrdering in
CTAS/RTAS spark#40734, this PR is essentially a copy of it.
---
.../execution/enhanced/VeloxIcebergSuite.scala | 27 +++
.../org/apache/spark/sql/gluten/TestUtils.scala | 46 +++++
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 5 +-
.../org/apache/gluten/sql/shims/SparkShims.scala | 2 +
.../gluten/sql/shims/spark34/Spark34Shims.scala | 4 +
.../sql/extension/RewriteCreateTableAsSelect.scala | 202 +++++++++++++++++++++
6 files changed, 285 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 44ee905d01..a7dfe4e79e 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -19,7 +19,9 @@ package org.apache.gluten.execution.enhanced
import org.apache.gluten.execution.{IcebergSuite, VeloxIcebergAppendDataExec,
VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec}
import org.apache.gluten.tags.EnhancedFeaturesTest
+import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.gluten.TestUtils
@EnhancedFeaturesTest
class VeloxIcebergSuite extends IcebergSuite {
@@ -190,4 +192,29 @@ class VeloxIcebergSuite extends IcebergSuite {
assert(result(0).get(0) == 2)
}
}
+
+ test("iceberg create table as select") {
+ withTable("iceberg_tb1", "iceberg_tb2") {
+ spark.sql("""
+ |create table iceberg_tb1 (a int, pt int) using iceberg
+ |partitioned by (pt)
+ |""".stripMargin)
+
+ spark.sql("insert into table iceberg_tb1 values (1, 1), (2, 2)")
+
+ // CTAS
+ val sqlStr = """
+ |create table iceberg_tb2 using iceberg
+ |partitioned by (pt)
+ |as select * from iceberg_tb1
+ |""".stripMargin
+
+ TestUtils.checkExecutedPlanContains[VeloxIcebergAppendDataExec](spark,
sqlStr)
+
+ checkAnswer(
+ spark.sql("select * from iceberg_tb2 order by a"),
+ Seq(Row(1, 1), Row(2, 2))
+ )
+ }
+ }
}
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
new file mode 100644
index 0000000000..587c064b9c
--- /dev/null
+++
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.gluten
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import scala.reflect.ClassTag
+
+object TestUtils {
+
+ def checkExecutedPlanContains[T: ClassTag](spark: SparkSession, sqlStr:
String): Unit = {
+ var found = false
+ val queryListener = new QueryExecutionListener {
+ override def onFailure(f: String, qe: QueryExecution, e: Exception):
Unit = {}
+ override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
+ if (!found) {
+ found =
qe.executedPlan.find(implicitly[ClassTag[T]].runtimeClass.isInstance(_)).isDefined
+ }
+ }
+ }
+ try {
+ spark.listenerManager.register(queryListener)
+ spark.sql(sqlStr)
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ assert(found)
+ } finally {
+ spark.listenerManager.unregister(queryListener)
+ }
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 45abbd5a55..50fced37ea 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.velox
-import org.apache.gluten.backendsapi.RuleApi
+import org.apache.gluten.backendsapi.{BackendsApiManager, RuleApi}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
@@ -66,6 +66,9 @@ object VeloxRuleApi {
injector.injectOptimizerRule(RewriteCastFromArray.apply)
injector.injectPostHocResolutionRule(ArrowConvertorRule.apply)
injector.injectOptimizerRule(RewriteUnboundedWindow.apply)
+ if (BackendsApiManager.getSettings.supportAppendDataExec()) {
+
injector.injectPlannerStrategy(SparkShimLoader.getSparkShims.getRewriteCreateTableAsSelect(_))
+ }
}
/**
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 3a59135c80..5f62c272f7 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -319,4 +319,6 @@ trait SparkShims {
def unBase64FunctionFailsOnError(unBase64: UnBase64): Boolean = false
def widerDecimalType(d1: DecimalType, d2: DecimalType): DecimalType
+
+ def getRewriteCreateTableAsSelect(session: SparkSession): SparkStrategy = _
=> Seq.empty
}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 16affb2ad8..c097833435 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -50,6 +50,7 @@ import
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.extension.RewriteCreateTableAsSelect
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType,
StructField, StructType}
@@ -647,4 +648,7 @@ class Spark34Shims extends SparkShims {
DecimalPrecision.widerDecimalType(d1, d2)
}
+ override def getRewriteCreateTableAsSelect(session: SparkSession):
SparkStrategy = {
+ RewriteCreateTableAsSelect(session)
+ }
}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/extension/RewriteCreateTableAsSelect.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/extension/RewriteCreateTableAsSelect.scala
new file mode 100644
index 0000000000..c8970456a6
--- /dev/null
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/extension/RewriteCreateTableAsSelect.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.extension
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
FileSourceMetadataAttribute}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils,
METADATA_COL_ATTR_KEY}
+import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.isSessionCatalog
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
LeafV2CommandExec}
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+import org.apache.spark.util.Utils
+
+import scala.collection.JavaConverters._
+
+case class RewriteCreateTableAsSelect(session: SparkSession) extends
SparkStrategy {
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ plan match {
+ case CreateTableAsSelect(
+ ResolvedIdentifier(catalog, ident),
+ parts,
+ query,
+ tableSpec: TableSpec,
+ options,
+ ifNotExists,
+ analyzedQuery) if analyzedQuery.isDefined &&
!supportsV1Command(catalog) =>
+ catalog match {
+ case staging: StagingTableCatalog =>
+ AtomicCreateTableAsSelectExec(
+ staging,
+ ident,
+ parts,
+ analyzedQuery.get,
+ qualifyLocInTableSpec(tableSpec),
+ options,
+ ifNotExists) :: Nil
+ case _ =>
+ CreateTableAsSelectExec(
+ catalog.asTableCatalog,
+ ident,
+ parts,
+ analyzedQuery.get,
+ qualifyLocInTableSpec(tableSpec),
+ options,
+ ifNotExists) :: Nil
+ }
+
+ case _ => Nil
+ }
+ }
+
+ private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
+ isSessionCatalog(catalog) && catalog.isInstanceOf[CatalogExtension]
+ }
+
+ private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
+ tableSpec.copy(location =
tableSpec.location.map(makeQualifiedDBObjectPath))
+ }
+
+ private def makeQualifiedDBObjectPath(location: String): String = {
+ CatalogUtils.makeQualifiedDBObjectPath(
+ session.sharedState.conf.get(WAREHOUSE_PATH),
+ location,
+ session.sharedState.hadoopConf)
+ }
+}
+
+/** Port from Spark 3.5. */
+case class AtomicCreateTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ ifNotExists: Boolean)
+ extends V2CreateTableAsSelectBaseExec {
+
+ val properties: Map[String, String] =
CatalogV2Util.convertTableProperties(tableSpec)
+
+ override protected def run(): Seq[InternalRow] = {
+ if (catalog.tableExists(ident)) {
+ if (ifNotExists) {
+ return Nil
+ }
+ throw QueryCompilationErrors.tableAlreadyExistsError(ident)
+ }
+
+ val stagedTable = catalog.stageCreate(
+ ident,
+ getV2Columns(query.schema),
+ partitioning.toArray,
+ properties.asJava)
+ writeToTable(catalog, stagedTable, writeOptions, ident, query)
+ }
+}
+
+case class CreateTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ ifNotExists: Boolean)
+ extends V2CreateTableAsSelectBaseExec {
+
+ val properties: Map[String, String] =
CatalogV2Util.convertTableProperties(tableSpec)
+
+ override protected def run(): Seq[InternalRow] = {
+ if (catalog.tableExists(ident)) {
+ if (ifNotExists) {
+ return Nil
+ }
+ throw QueryCompilationErrors.tableAlreadyExistsError(ident)
+ }
+ val table = catalog.createTable(
+ ident,
+ getV2Columns(query.schema),
+ partitioning.toArray,
+ properties.asJava)
+ writeToTable(catalog, table, writeOptions, ident, query)
+ }
+}
+
+trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec {
+ override def output: Seq[Attribute] = Nil
+
+ protected def getV2Columns(schema: StructType): Array[Column] = {
+ val rawSchema =
CharVarcharUtils.getRawSchema(removeInternalMetadata(schema), conf)
+ CatalogV2Util.structTypeToV2Columns(rawSchema.asNullable)
+ }
+
+ protected def writeToTable(
+ catalog: TableCatalog,
+ table: Table,
+ writeOptions: Map[String, String],
+ ident: Identifier,
+ query: LogicalPlan): Seq[InternalRow] = {
+ Utils.tryWithSafeFinallyAndFailureCallbacks({
+ val relation = DataSourceV2Relation.create(table, Some(catalog),
Some(ident))
+ val append = AppendData.byPosition(relation, query, writeOptions)
+ val qe = session.sessionState.executePlan(append)
+ qe.assertCommandExecuted()
+
+ table match {
+ case st: StagedTable => st.commitStagedChanges()
+ case _ =>
+ }
+
+ Nil
+ })(catchBlock = {
+ table match {
+ // Failure rolls back the staged writes and metadata changes.
+ case st: StagedTable => st.abortStagedChanges()
+ case _ => catalog.dropTable(ident)
+ }
+ })
+ }
+
+ val INTERNAL_METADATA_KEYS = Seq(
+ "__autoGeneratedAlias",
+ METADATA_COL_ATTR_KEY,
+ "__qualified_access_only",
+ FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY,
+ "__file_source_constant_metadata_col",
+ "__file_source_generated_metadata_col"
+ )
+
+ private def removeInternalMetadata(schema: StructType): StructType = {
+ StructType(schema.map {
+ field =>
+ var builder = new MetadataBuilder().withMetadata(field.metadata)
+ INTERNAL_METADATA_KEYS.foreach(key => builder = builder.remove(key))
+ field.copy(metadata = builder.build())
+ })
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]