jonvex commented on code in PR #12933:
URL: https://github.com/apache/hudi/pull/12933#discussion_r1985284719
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -328,29 +329,29 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- def projectedJoinedDataset: DataFrame = {
+ private def getProcessedInputDf: DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
- // We want to join the source and target tables.
- // Then we want to project the output so that we have the meta columns
from the target table
- // followed by the data columns of the source table
- val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
- val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
- val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
// for pkless table, we need to project the meta columns
val hasPrimaryKey =
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
Review Comment:
Should we just add a method for hasPrimaryKey since we also do the same
thing at line 622?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -328,29 +329,29 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- def projectedJoinedDataset: DataFrame = {
+ private def getProcessedInputDf: DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
- // We want to join the source and target tables.
- // Then we want to project the output so that we have the meta columns
from the target table
- // followed by the data columns of the source table
- val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
- val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
- val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
// for pkless table, we need to project the meta columns
val hasPrimaryKey =
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
- val projectedJoinPlan = if (!hasPrimaryKey ||
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false") == "true") {
+ val inputDataPlan = if (!hasPrimaryKey) {
Review Comment:
Remove "merge" from the config documentation for SPARK_SQL_OPTIMIZED_WRITES
Optional: create a new config SPARK_SQL_OPTIMIZED_WRITES_MIT with default
false so this code path can still be used for non-pkless.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]