aokolnychyi commented on code in PR #7443:
URL: https://github.com/apache/iceberg/pull/7443#discussion_r1179305016
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala:
##########
@@ -22,100 +22,40 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.Optional
import java.util.UUID
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression
-import org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
-import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.write.DeltaWriteBuilder
import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
-import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite
-import org.apache.spark.sql.connector.write.SupportsOverwrite
-import org.apache.spark.sql.connector.write.SupportsTruncate
import org.apache.spark.sql.connector.write.WriteBuilder
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.AlwaysTrue
-import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
/**
- * A rule that is inspired by V2Writes in Spark but supports Iceberg
transforms.
+ * A rule that is inspired by V2Writes in Spark but supports Iceberg specific
plans.
*/
object ExtendedV2Writes extends Rule[LogicalPlan] with PredicateHelper {
import DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
- case a @ AppendData(r: DataSourceV2Relation, query, options, _, None, _)
if isIcebergRelation(r) =>
- val writeBuilder = newWriteBuilder(r.table, query.schema, options)
- val write = writeBuilder.build()
- val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write,
query, conf)
- a.copy(write = Some(write), query = newQuery)
-
- case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query,
options, _, None, _)
- if isIcebergRelation(r) =>
- // fail if any filter cannot be converted. correctness depends on
removing all matching data.
- val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred =>
- val filter = DataSourceStrategy.translateFilter(pred,
supportNestedPredicatePushdown = true)
- if (filter.isEmpty) {
- throw
QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
- }
- filter
- }.toArray
-
- val table = r.table
- val writeBuilder = newWriteBuilder(table, query.schema, options)
- val write = writeBuilder match {
- case builder: SupportsTruncate if isTruncate(filters) =>
- builder.truncate().build()
- case builder: SupportsOverwrite =>
- builder.overwrite(filters).build()
- case _ =>
- throw
QueryExecutionErrors.overwriteTableByUnsupportedExpressionError(table)
- }
-
- val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write,
query, conf)
- o.copy(write = Some(write), query = newQuery)
-
- case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query,
options, _, None)
- if isIcebergRelation(r) =>
- val table = r.table
- val writeBuilder = newWriteBuilder(table, query.schema, options)
- val write = writeBuilder match {
- case builder: SupportsDynamicOverwrite =>
- builder.overwriteDynamicPartitions().build()
- case _ =>
- throw
QueryExecutionErrors.dynamicPartitionOverwriteUnsupportedByTableError(table)
- }
- val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write,
query, conf)
- o.copy(write = Some(write), query = newQuery)
-
case rd @ ReplaceIcebergData(r: DataSourceV2Relation, query, _, None) =>
Review Comment:
We have to keep plans for row-level operations for now as Spark plans don't
support runtime filtering for UPDATE and MERGE. It will be part of Spark 3.5.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]