aokolnychyi commented on code in PR #7443:
URL: https://github.com/apache/iceberg/pull/7443#discussion_r1179305016


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala:
##########
@@ -22,100 +22,40 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.util.Optional
 import java.util.UUID
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression
-import org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
 import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
-import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.write.DeltaWriteBuilder
 import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
-import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite
-import org.apache.spark.sql.connector.write.SupportsOverwrite
-import org.apache.spark.sql.connector.write.SupportsTruncate
 import org.apache.spark.sql.connector.write.WriteBuilder
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.AlwaysTrue
-import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 /**
- * A rule that is inspired by V2Writes in Spark but supports Iceberg 
transforms.
+ * A rule that is inspired by V2Writes in Spark but supports Iceberg specific 
plans.
  */
 object ExtendedV2Writes extends Rule[LogicalPlan] with PredicateHelper {
 
   import DataSourceV2Implicits._
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
-    case a @ AppendData(r: DataSourceV2Relation, query, options, _, None, _) 
if isIcebergRelation(r) =>
-      val writeBuilder = newWriteBuilder(r.table, query.schema, options)
-      val write = writeBuilder.build()
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, 
query, conf)
-      a.copy(write = Some(write), query = newQuery)
-
-    case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, 
options, _, None, _)
-        if isIcebergRelation(r) =>
-      // fail if any filter cannot be converted. correctness depends on 
removing all matching data.
-      val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred =>
-        val filter = DataSourceStrategy.translateFilter(pred, 
supportNestedPredicatePushdown = true)
-        if (filter.isEmpty) {
-          throw 
QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
-        }
-        filter
-      }.toArray
-
-      val table = r.table
-      val writeBuilder = newWriteBuilder(table, query.schema, options)
-      val write = writeBuilder match {
-        case builder: SupportsTruncate if isTruncate(filters) =>
-          builder.truncate().build()
-        case builder: SupportsOverwrite =>
-          builder.overwrite(filters).build()
-        case _ =>
-          throw 
QueryExecutionErrors.overwriteTableByUnsupportedExpressionError(table)
-      }
-
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, 
query, conf)
-      o.copy(write = Some(write), query = newQuery)
-
-    case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, 
options, _, None)
-        if isIcebergRelation(r) =>
-      val table = r.table
-      val writeBuilder = newWriteBuilder(table, query.schema, options)
-      val write = writeBuilder match {
-        case builder: SupportsDynamicOverwrite =>
-          builder.overwriteDynamicPartitions().build()
-        case _ =>
-          throw 
QueryExecutionErrors.dynamicPartitionOverwriteUnsupportedByTableError(table)
-      }
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, 
query, conf)
-      o.copy(write = Some(write), query = newQuery)
-
     case rd @ ReplaceIcebergData(r: DataSourceV2Relation, query, _, None) =>

Review Comment:
   We have to keep plans for row-level operations for now as Spark plans don't 
support runtime filtering for UPDATE and MERGE. It will be part of Spark 3.5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to