aokolnychyi commented on a change in pull request #4047:
URL: https://github.com/apache/iceberg/pull/4047#discussion_r807302964



##########
File path: 
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelCommand {
     ReplaceData(writeRelation, mergeRows, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      source: LogicalPlan,
+      cond: Expression,
+      matchedActions: Seq[MergeAction],
+      notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, operationTable, 
metadataAttrs, rowIdAttrs)
+    val readAttrs = readRelation.output
+
+    // project an extra column to check if a target row exists after the join
+    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, 
ROW_FROM_TARGET)()
+    val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+    // project an extra column to check if a source row exists after the join
+    val sourceTableProjExprs = source.output :+ Alias(TrueLiteral, 
ROW_FROM_SOURCE)()
+    val sourceTableProj = Project(sourceTableProjExprs, source)
+
+    // use inner join if there is no NOT MATCHED action, unmatched source rows 
can be discarded
+    // use right outer join in all other cases, unmatched source rows may be 
needed
+    // also disable broadcasts for the target table to perform the cardinality 
check later

Review comment:
       Removed "later" for consistency in comments. The comments in 
copy-on-write and merge-on-read are slightly different as different join type 
are used. I'll take a look at what extra methods we can introduce to simplify 
this. I did that in a few places already.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  //
+  // UNPARTITIONED ORDERED BY id, data
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data

Review comment:
       Added a comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to