aokolnychyi commented on a change in pull request #2189:
URL: https://github.com/apache/iceberg/pull/2189#discussion_r568321993



##########
File path: 
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
##########
@@ -48,6 +48,55 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS source");
   }
 
+  // TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
+
+  @Test
+  public void testMergeInsertOnly() {
+    createAndInitTable("id STRING, v STRING",
+        "{ \"id\": \"a\", \"v\": \"v1\" }\n" +
+        "{ \"id\": \"b\", \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": \"a\", \"v\": \"v1_1\" }\n" +
+        "{ \"id\": \"a\", \"v\": \"v1_2\" }\n" +
+        "{ \"id\": \"c\", \"v\": \"v3\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_1\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_2\" }");

Review comment:
       Sounds like a good follow-up task.

##########
File path: 
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
##########
@@ -48,6 +48,55 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS source");
   }
 
+  // TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
+
+  @Test
+  public void testMergeInsertOnly() {
+    createAndInitTable("id STRING, v STRING",
+        "{ \"id\": \"a\", \"v\": \"v1\" }\n" +
+        "{ \"id\": \"b\", \"v\": \"v2\" }");
+    createOrReplaceView("source",
+        "{ \"id\": \"a\", \"v\": \"v1_1\" }\n" +
+        "{ \"id\": \"a\", \"v\": \"v1_2\" }\n" +
+        "{ \"id\": \"c\", \"v\": \"v3\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_1\" }\n" +
+        "{ \"id\": \"d\", \"v\": \"v4_2\" }");
+
+    sql("MERGE INTO %s t USING source " +
+        "ON t.id == source.id " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *", tableName);
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(
+        row("a", "v1"),   // kept
+        row("b", "v2"),   // kept
+        row("c", "v3"),   // new
+        row("d", "v4_1"), // new
+        row("d", "v4_2")  // new
+    );
+    assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s 
ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testMergeInsertOnlyWithCondition() {
+    createAndInitTable("id INTEGER, v INTEGER", "{ \"id\": 1, \"v\": 1 }");
+    createOrReplaceView("source",
+        "{ \"id\": 1, \"v\": 11, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 21, \"is_new\": true }\n" +
+        "{ \"id\": 2, \"v\": 22, \"is_new\": false }");
+
+    sql("MERGE INTO %s t USING source s " +
+        "ON t.id == s.id " +
+        "WHEN NOT MATCHED AND is_new = TRUE THEN " +
+        "  INSERT (v, id) VALUES (s.v + 100, s.id)", tableName);

Review comment:
       Added. Resolving this.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -158,6 +144,43 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     }
   }
 
+  // when there are no matched actions, use a left anti join to remove any 
matching rows and rewrite to use
+  // append instead of replace. only unmatched source rows are passed to the 
merge and actions are all inserts.
+  private def buildInsertOnlyMergePlan(
+      targetTableScan: LogicalPlan,
+      targetTableOutput: Seq[AttributeReference],
+      source: LogicalPlan,
+      cond: Expression,
+      notMatchedActions: Seq[MergeAction]): LogicalPlan = {
+
+    notMatchedActions match {
+      case Seq(insertAction: InsertAction) =>
+        val filteredSource = insertAction.condition match {

Review comment:
       This is purely a Spark responsibility but I added a test too. Resolving 
this.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -158,6 +144,43 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     }
   }
 
+  // when there are no matched actions, use a left anti join to remove any 
matching rows and rewrite to use
+  // append instead of replace. only unmatched source rows are passed to the 
merge and actions are all inserts.
+  private def buildInsertOnlyMergePlan(
+      targetTableScan: LogicalPlan,
+      targetTableOutput: Seq[AttributeReference],
+      source: LogicalPlan,
+      cond: Expression,
+      notMatchedActions: Seq[MergeAction]): LogicalPlan = {
+
+    notMatchedActions match {
+      case Seq(insertAction: InsertAction) =>
+        val filteredSource = insertAction.condition match {
+          case Some(insertCond) => Filter(insertCond, source)
+          case None => source
+        }
+        val outputExprs = insertAction.assignments.map(_.value)
+        val outputColNames = targetTableOutput.map(_.name)
+        val outputCols = outputExprs.zip(outputColNames).map { case (expr, 
name) => Alias(expr, name)() }
+        val joinPlan = Join(filteredSource, targetTableScan, LeftAnti, 
Some(cond), JoinHint.NONE)
+        Project(outputCols, joinPlan)

Review comment:
       Reverted.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -72,23 +74,7 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
           if matchedActions.isEmpty && isIcebergRelation(target) =>
 
         val targetTableScan = buildSimpleScanPlan(target, cond)
-
-        // when there are no matched actions, use a left anti join to remove 
any matching rows and rewrite to use
-        // append instead of replace. only unmatched source rows are passed to 
the merge and actions are all inserts.
-        val joinPlan = Join(source, targetTableScan, LeftAnti, Some(cond), 
JoinHint.NONE)
-
-        val mergeParams = MergeIntoParams(
-          isSourceRowNotPresent = FALSE_LITERAL,
-          isTargetRowNotPresent = TRUE_LITERAL,
-          matchedConditions = Nil,
-          matchedOutputs = Nil,
-          notMatchedConditions = notMatchedActions.map(getClauseCondition),
-          notMatchedOutputs = notMatchedActions.map(actionOutput),
-          targetOutput = Nil,
-          joinedAttributes = joinPlan.output
-        )
-
-        val mergePlan = MergeInto(mergeParams, target.output, joinPlan)
+        val mergePlan = buildInsertOnlyMergePlan(targetTableScan, 
target.output, source, cond, notMatchedActions)

Review comment:
       Done.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -68,6 +70,37 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan resolveOperators {
+      case MergeIntoTable(target: DataSourceV2Relation, source, cond, 
matchedActions, notMatchedActions)
+          if matchedActions.isEmpty && notMatchedActions.size == 1 && 
isIcebergRelation(target) =>
+
+        val targetTableScan = buildSimpleScanPlan(target, cond)
+
+        val insertAction = notMatchedActions match {

Review comment:
       Done.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -68,6 +70,37 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan resolveOperators {
+      case MergeIntoTable(target: DataSourceV2Relation, source, cond, 
matchedActions, notMatchedActions)
+          if matchedActions.isEmpty && notMatchedActions.size == 1 && 
isIcebergRelation(target) =>
+
+        val targetTableScan = buildSimpleScanPlan(target, cond)
+
+        val insertAction = notMatchedActions match {
+          case Seq(action: InsertAction) =>
+            action
+          case _ =>
+            throw new AnalysisException("Only insert actions are supported in 
NOT MATCHED clauses")

Review comment:
       Yep, removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to