szehon-ho commented on a change in pull request #4047:
URL: https://github.com/apache/iceberg/pull/4047#discussion_r806356606



##########
File path: 
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelCommand {
     ReplaceData(writeRelation, mergeRows, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      source: LogicalPlan,
+      cond: Expression,
+      matchedActions: Seq[MergeAction],
+      notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, operationTable, 
metadataAttrs, rowIdAttrs)
+    val readAttrs = readRelation.output
+
+    // project an extra column to check if a target row exists after the join
+    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, 
ROW_FROM_TARGET)()
+    val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+    // project an extra column to check if a source row exists after the join
+    val sourceTableProjExprs = source.output :+ Alias(TrueLiteral, 
ROW_FROM_SOURCE)()
+    val sourceTableProj = Project(sourceTableProjExprs, source)
+
+    // use inner join if there is no NOT MATCHED action, unmatched source rows 
can be discarded
+    // use right outer join in all other cases, unmatched source rows may be 
needed
+    // also disable broadcasts for the target table to perform the cardinality 
check later
+    val joinType = if (notMatchedActions.isEmpty) Inner else RightOuter
+    val joinHint = JoinHint(leftHint = 
Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, 
joinType, Some(cond), joinHint)
+
+    val deleteRowValues = buildDeltaDeleteRowValues(rowAttrs, rowIdAttrs)
+    val metadataReadAttrs = readAttrs.filterNot(relation.outputSet.contains)
+
+    val matchedConditions = matchedActions.map(actionCondition)
+    val matchedOutputs = matchedActions.map { action =>
+      deltaActionOutput(action, deleteRowValues, metadataReadAttrs)
+    }
+
+    val notMatchedConditions = notMatchedActions.map(actionCondition)
+    val notMatchedOutputs = notMatchedActions.map { action =>

Review comment:
       I guess on the topic of oneline, this can as well?

##########
File path: 
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelCommand {
     ReplaceData(writeRelation, mergeRows, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      source: LogicalPlan,
+      cond: Expression,
+      matchedActions: Seq[MergeAction],
+      notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, operationTable, 
metadataAttrs, rowIdAttrs)
+    val readAttrs = readRelation.output
+
+    // project an extra column to check if a target row exists after the join
+    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, 
ROW_FROM_TARGET)()
+    val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+    // project an extra column to check if a source row exists after the join
+    val sourceTableProjExprs = source.output :+ Alias(TrueLiteral, 
ROW_FROM_SOURCE)()
+    val sourceTableProj = Project(sourceTableProjExprs, source)
+
+    // use inner join if there is no NOT MATCHED action, unmatched source rows 
can be discarded
+    // use right outer join in all other cases, unmatched source rows may be 
needed
+    // also disable broadcasts for the target table to perform the cardinality 
check later

Review comment:
       Was also trying to compare the two code-paths.  Noticed this has a 
'later' and the other does not, the 'later' seems to be more clear.  Actually 
wondering, did you consider putting some of these repeated codes/comments into 
methods 
   
   ```
   val sourceTableProj = sourceTableProj(source)
   val joinPlan = joinPlan(sourceTableProj, targetTableProj, cond, 
notMatchedActions)
   ``` 

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java
##########
@@ -171,12 +194,16 @@ private static Distribution 
buildPositionDeleteUpdateDistribution(DistributionMo
     }
   }
 
-  public static SortOrder[] buildPositionDeltaOrdering(Table table, Command 
command, Distribution distribution) {
+  public static SortOrder[] buildPositionDeltaOrdering(Table table, Command 
command) {
     // the spec requires position delete files to be sorted by file and pos
+    SortOrder[] deleteOrdering = {SPEC_ID_ORDER, PARTITION_ORDER, 
FILE_PATH_ORDER, ROW_POSITION_ORDER};
+
     if (command == DELETE || command == UPDATE) {
-      return new SortOrder[]{SPEC_ID_ORDER, PARTITION_ORDER, FILE_PATH_ORDER, 
ROW_POSITION_ORDER};
+      return deleteOrdering;
     } else {
-      throw new IllegalArgumentException("Only position deletes and updates 
are currently supported");
+      // all metadata columns like _spec_id, _file, _pos will be null for new 
data records

Review comment:
       Optional:  just noticed we throw IllegalArgumentException in other 
places where we have an unknown command, so was just wondering would it be 
clearer/more consistent to mark this case as 'MERGE' and throw the exception 
like other case?  Up to you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to