yihua commented on code in PR #12933:
URL: https://github.com/apache/hudi/pull/12933#discussion_r2015003947
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -328,29 +329,29 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- def projectedJoinedDataset: DataFrame = {
+ private def getProcessedInputDf: DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
- // We want to join the source and target tables.
- // Then we want to project the output so that we have the meta columns
from the target table
- // followed by the data columns of the source table
- val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
- val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
- val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
// for pkless table, we need to project the meta columns
val hasPrimaryKey =
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
Review Comment:
Fixed.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -328,29 +329,29 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- def projectedJoinedDataset: DataFrame = {
+ private def getProcessedInputDf: DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
- // We want to join the source and target tables.
- // Then we want to project the output so that we have the meta columns
from the target table
- // followed by the data columns of the source table
- val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
- val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
- val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
// for pkless table, we need to project the meta columns
val hasPrimaryKey =
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
- val projectedJoinPlan = if (!hasPrimaryKey ||
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false") == "true") {
+ val inputDataPlan = if (!hasPrimaryKey) {
Review Comment:
Updated the config docs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]