jonvex commented on code in PR #9083:
URL: https://github.com/apache/hudi/pull/9083#discussion_r1249609522


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -301,15 +335,20 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   def sourceDataset: DataFrame = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
-    val sourceTablePlan = mergeInto.sourceTable
+    val tablemetacols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
+    val joinData = sparkAdapter.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")

Review Comment:
   No spark join was happening before this patch. The Hudi upsert operation 
during tagging matches the incoming records to their existing records if they 
exist. That is functionally equivalent to joining the incoming records with the 
existing records on the recordkey. There was a restriction that we could only 
match on the recordkey, so we didn't need to worry about the other columns. Now 
that we want to join on other columns for pkless, we need to use the spark join 
operation which will work for any column. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to