This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 8b16794b34c [HUDI-9088] Fix unnecessary scanning of target table in 
MERGE INTO on Spark (#12933)
8b16794b34c is described below

commit 8b16794b34c881675e39a97d3e56f3656f7d8b31
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Apr 21 14:02:09 2025 -0700

    [HUDI-9088] Fix unnecessary scanning of target table in MERGE INTO on Spark 
(#12933)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   4 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   2 +
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  81 +++++---
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  80 +++++++-
 .../spark/sql/hudi/dml/TestMergeIntoTable.scala    | 210 ++++++++++++---------
 5 files changed, 257 insertions(+), 120 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 45134f91278..a74b3eba1f6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -29,10 +29,10 @@ import org.apache.hudi.common.util.Option
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, 
HiveSyncTool}
+import org.apache.hudi.keygen.{CustomKeyGenerator, NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
 import org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.{getKeyGeneratorClassNameFromType,
 inferKeyGeneratorTypeFromWriteConfig}
-import org.apache.hudi.keygen.{CustomKeyGenerator, NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.JFunction
 
@@ -668,7 +668,7 @@ object DataSourceWriteOptions {
     .defaultValue("true")
     .markAdvanced()
     .sinceVersion("0.14.0")
-    .withDocumentation("Controls whether spark sql prepped update, delete, and 
merge are enabled.")
+    .withDocumentation("Controls whether spark sql prepped update and delete 
are enabled.")
 
   val OVERWRITE_MODE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.overwrite.mode")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index e1bae0dbf3c..4e67742cd4d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -32,6 +32,7 @@ import org.apache.hudi.keygen.{ComplexKeyGenerator, 
CustomAvroKeyGenerator, Cust
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
@@ -48,6 +49,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.slf4j.LoggerFactory
 
 import java.util.Locale
+
 import scala.collection.JavaConverters._
 
 trait ProvidesHoodieConfig extends Logging {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 2449817458d..dfbf9d3785c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.avro.Schema
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, HoodieSparkUtils, SparkAdapterSupport}
 import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_SCHEMA
@@ -30,19 +30,20 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.JFunction.scalaFunction1Noop
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, HoodieSparkUtils, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, 
attributeEquals}
+
+import org.apache.avro.Schema
 import org.apache.spark.sql._
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, 
MatchCast}
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, EqualTo, Expression, Literal, 
NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
 import org.apache.spark.sql.catalyst.plans.LeftOuter
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, 
getPartitionPathFieldWriteConfig}
 import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
-import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference,
 encodeAsBase64String, stripCasting, toStructType}
+import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String,
 stripCasting, toStructType, CoercedAttributeReference}
 import 
org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -105,6 +106,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
 
   private lazy val targetTableType = hoodieCatalogTable.tableTypeName
 
+  private lazy val isPrimaryKeylessTable = 
hoodieCatalogTable.primaryKeys.isEmpty
+
   /**
    * Mapping of the Merge-Into-Table (MIT) command's [[targetTable]] attribute 
into
    * corresponding expression (involving reference from the [[sourceTable]]) 
from the MIT
@@ -275,11 +278,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       sparkSession.conf.set("spark.sql.crossJoin.enabled","true")
     }
 
-    val projectedJoinedDF: DataFrame = projectedJoinedDataset
+    val processedInputDf: DataFrame = getProcessedInputDf
     // Create the write parameters
     val props = buildMergeIntoConfig(hoodieCatalogTable)
     // Do the upsert
-    executeUpsert(projectedJoinedDF, props)
+    executeUpsert(processedInputDf, props)
     // Refresh the table in the catalog
     sparkSession.catalog.refreshTable(hoodieCatalogTable.table.qualifiedName)
 
@@ -290,9 +293,25 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   private val insertingActions: Seq[InsertAction] = 
mergeInto.notMatchedActions.collect { case u: InsertAction => u}
   private val deletingActions: Seq[DeleteAction] = 
mergeInto.matchedActions.collect { case u: DeleteAction => u}
 
+  private def hasPrimaryKey(): Boolean = {
+    hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
+  }
+
   /**
-   * Here we're adjusting incoming (source) dataset in case its schema is 
divergent from
-   * the target table, to make sure it (at a bare minimum)
+   * Here we're processing the logical plan of the source table and optionally 
the target
+   * table to get it prepared for writing the data into the Hudi table:
+   * <ul>
+   * <li> For a target table with record key(s) configure, the source table
+   * [[mergeInto.sourceTable]] is used.
+   * <li> For a primary keyless target table, the source table 
[[mergeInto.sourceTable]]
+   * and target table [[mergeInto.targetTable]] are left-outer joined based 
the on the
+   * merge condition so that the record key stored in the record key meta 
column
+   * (`_hoodie_record_key`) are attached to the input records if they are 
updates.
+   * </ul>
+   *
+   * After getting the initial logical plan to precess as above, we're 
adjusting incoming
+   * (source) dataset in case its schema is divergent from the target table, 
to make sure
+   * it contains all the required columns for MERGE INTO (at a bare minimum)
    *
    * <ol>
    *   <li>Contains "primary-key" column (as defined by target table's 
config)</li>
@@ -328,29 +347,31 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * <li>{@code ts = source.sts}</li>
    * </ul>
    */
-  def projectedJoinedDataset: DataFrame = {
+  private def getProcessedInputDf: DataFrame = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
-    // We want to join the source and target tables.
-    // Then we want to project the output so that we have the meta columns 
from the target table
-    // followed by the data columns of the source table
-    val tableMetaCols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
-    val joinData = 
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
-    val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
-    // for pkless table, we need to project the meta columns
-    val hasPrimaryKey = 
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
-    val projectedJoinPlan = if (!hasPrimaryKey || 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
"false") == "true") {
+    // For pkless table, we need to project the meta columns by joining with 
the target table;
+    // for a Hudi table with record key, we use the source table and rely on 
Hudi's tagging
+    // to identify inserts, updates, and deletes to avoid the join
+    val inputPlan = if (!hasPrimaryKey()) {
+      // For a primary keyless target table, join the source and target tables.
+      // Then we want to project the output so that we have the meta columns 
from the target table
+      // followed by the data columns of the source table
+      val tableMetaCols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
+      val joinData = 
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
+      val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
       Project(tableMetaCols ++ incomingDataCols, joinData)
     } else {
-      Project(incomingDataCols, joinData)
+      // For a target table with record key(s) configure, the source table is 
used
+      mergeInto.sourceTable
     }
 
-    val projectedJoinOutput = projectedJoinPlan.output
+    val inputPlanAttributes = inputPlan.output
 
     val requiredAttributesMap = recordKeyAttributeToConditionExpression ++ 
preCombineAttributeAssociatedExpression
 
     val (existingAttributesMap, missingAttributesMap) = 
requiredAttributesMap.partition {
-      case (keyAttr, _) => projectedJoinOutput.exists(attr => 
resolver(keyAttr.name, attr.name))
+      case (keyAttr, _) => inputPlanAttributes.exists(attr => 
resolver(keyAttr.name, attr.name))
     }
 
     // This is to handle the situation where condition is something like 
"s0.s_id = t0.id" so In the source table
@@ -362,7 +383,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     //       them according to aforementioned heuristic) to meet Hudi's 
requirements
     val additionalColumns: Seq[NamedExpression] =
       missingAttributesMap.flatMap {
-        case (keyAttr, sourceExpression) if !projectedJoinOutput.exists(attr 
=> resolver(attr.name, keyAttr.name)) =>
+        case (keyAttr, sourceExpression) if !inputPlanAttributes.exists(attr 
=> resolver(attr.name, keyAttr.name)) =>
           Seq(Alias(sourceExpression, keyAttr.name)())
 
         case _ => Seq()
@@ -372,7 +393,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // matches to that one of the target table. This is necessary b/c unlike 
Spark, Avro is case-sensitive
     // and therefore would fail downstream if case of corresponding columns 
don't match
     val existingAttributes = existingAttributesMap.map(_._1)
-    val adjustedSourceTableOutput = projectedJoinOutput.map { attr =>
+    val adjustedSourceTableOutput = inputPlanAttributes.map { attr =>
       existingAttributes.find(keyAttr => resolver(keyAttr.name, attr.name)) 
match {
         // To align the casing we just rename the attribute to match that one 
of the
         // target table
@@ -381,7 +402,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       }
     }
 
-    val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns, 
projectedJoinPlan)
+    val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns, 
inputPlan)
 
     Dataset.ofRows(sparkSession, amendedPlan)
   }
@@ -575,7 +596,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // NOTE: We're relying on [[sourceDataset]] here instead of 
[[mergeInto.sourceTable]],
     //       as it could be amended to add missing primary-key and/or 
pre-combine columns.
     //       Please check [[sourceDataset]] scala-doc for more details
-    (projectedJoinedDataset.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
+    (getProcessedInputDf.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
   }
 
   private def resolvesToSourceAttribute(expr: Expression): Boolean = {
@@ -618,9 +639,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
     // for pkless tables, we need to enable optimized merge
-    val hasPrimaryKey = tableConfig.getRecordKeyFields.isPresent
-    val enableOptimizedMerge = if (!hasPrimaryKey) "true" else 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
"false")
-    val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
+    val isPrimaryKeylessTable = !hasPrimaryKey()
+    val keyGeneratorClassName = if (isPrimaryKeylessTable) {
       classOf[MergeIntoKeyGenerator].getCanonicalName
     } else {
       classOf[SqlKeyGenerator].getCanonicalName
@@ -659,7 +679,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       CANONICALIZE_SCHEMA.key -> "false",
       SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
       HoodieSparkSqlWriter.SQL_MERGE_INTO_WRITES.key -> "true",
-      HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY -> 
enableOptimizedMerge,
+      // Only primary keyless table requires prepped keys and upsert
+      HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY -> 
isPrimaryKeylessTable.toString,
       HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() -> 
(!StringUtils.isNullOrEmpty(preCombineField)).toString
     )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 33f84236faf..ddc745ee5fe 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -17,22 +17,30 @@
 
 package org.apache.spark.sql.hudi.common
 
-import org.apache.hudi.HoodieSparkRecordMerger
-import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils}
+import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieStorageConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
 import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, 
getSparkConfForTest}
+import org.apache.hudi.util.JFunction
 
 import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageContains
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types.StructField
 import org.apache.spark.util.Utils
 import org.joda.time.DateTimeZone
 import org.scalactic.source
@@ -155,6 +163,13 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     assertResult(true)(hasException)
   }
 
+  protected def validateTableSchema(tableName: String,
+                                    expectedStructFields: List[StructField]): 
Unit = {
+    assertResult(expectedStructFields)(
+      spark.sql(s"select * from $tableName").schema.fields
+        .filter(e => 
!HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(e.name)))
+  }
+
   def dropTypeLiteralPrefix(value: Any): Any = {
     value match {
       case s: String =>
@@ -205,6 +220,30 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     }
   }
 
+  protected def withSparkSqlSessionConfig(configNameValues: (String, String)*
+                                         )(f: => Unit): Unit = {
+    withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e, 
true)): _*)(f)
+  }
+
+  protected def withSparkSqlSessionConfigWithCondition(configNameValues: 
((String, String), Boolean)*
+                                                      )(f: => Unit): Unit = {
+    try {
+      configNameValues.foreach { case ((configName, configValue), condition) =>
+        if (condition) {
+          spark.sql(s"set $configName=$configValue")
+        }
+      }
+      f
+    } finally {
+      if (HoodieSparkUtils.gteqSpark3_1) {
+        // Spark 3.0.x does not support "RESET configuration_key"
+        configNameValues.foreach { case ((configName, configValue), condition) 
=>
+          spark.sql(s"reset $configName")
+        }
+      }
+    }
+  }
+
   protected def withRecordType(recordTypes: Seq[HoodieRecordType] = 
Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK),
                                recordConfig: Map[HoodieRecordType, Map[String, 
String]]=Map.empty)(f: => Unit) {
     // TODO HUDI-5264 Test parquet log with avro record in spark sql test
@@ -250,6 +289,39 @@ object HoodieSparkSqlTestBase {
       .getActiveTimeline.getInstantDetails(cleanInstant).get)
   }
 
+  def getMetaClientAndFileSystemView(basePath: String):
+  (HoodieTableMetaClient, SyncableFileSystemView) = {
+    val storageConf = HoodieTestUtils.getDefaultStorageConf
+    val metaClient: HoodieTableMetaClient =
+      
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+    val metadataConfig = HoodieMetadataConfig.newBuilder.build
+    val engineContext = new HoodieLocalEngineContext(storageConf)
+    val viewManager: FileSystemViewManager = 
FileSystemViewManager.createViewManager(
+      engineContext, FileSystemViewStorageConfig.newBuilder.build,
+      HoodieCommonConfig.newBuilder.build,
+      JFunction.toJavaSerializableFunctionUnchecked(
+      (_: HoodieTableMetaClient) => {
+        HoodieTableMetadata.create(
+          engineContext, metaClient.getStorage, metadataConfig, 
metaClient.getBasePath.toString)
+      })
+    )
+    val fsView: SyncableFileSystemView = 
viewManager.getFileSystemView(metaClient)
+    (metaClient, fsView)
+  }
+
+  /**
+   * Replaces the existing file with an empty file which is meant to be 
corrupted
+   * in a Hudi table.
+   *
+   * @param storage  [[HoodieStorage]] instance
+   * @param filePath file path
+   */
+  def replaceWithEmptyFile(storage: HoodieStorage,
+                           filePath: StoragePath): Unit = {
+    storage.deleteFile(filePath)
+    storage.createNewFile(filePath)
+  }
+
   private def checkMessageContains(e: Throwable, text: String): Boolean =
     e.getMessage.trim.contains(text.trim)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index 7fe9a753014..b26d42ab33f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -17,109 +17,151 @@
 
 package org.apache.spark.sql.hudi.dml
 
-import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
-import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils, ScalaAssertionSupport}
-import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils, ScalaAssertionSupport}
+
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField}
+import org.junit.jupiter.api.Assertions.assertTrue
+
+import scala.collection.JavaConverters._
 
 class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
 
   test("Test MergeInto Basic") {
     Seq(true, false).foreach { sparkSqlOptimizedWrites =>
       withRecordType()(withTempDir { tmp =>
-        spark.sql("set hoodie.payload.combined.schema.validate = false")
-        val tableName = generateTableName
-        // Create table
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '${tmp.getCanonicalPath}'
-             | tblproperties (
-             |  primaryKey ='id',
-             |  preCombineField = 'ts'
-             | )
+        withSparkSqlSessionConfig("hoodie.payload.combined.schema.validate" -> 
"false",
+          // Column stats and partition stats need to be disabled to validate
+          // that there is no full target table scan during MERGE INTO
+          "hoodie.metadata.index.column.stats.enable" -> "false",
+          "hoodie.metadata.index.partition.stats.enable" -> "false") {
+          val tableName = generateTableName
+          // Create table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts int,
+               |  partition string
+               |) using hudi
+               | partitioned by (partition)
+               | location '${tmp.getCanonicalPath}'
+               | tblproperties (
+               |  primaryKey ='id',
+               |  preCombineField = 'ts'
+               | )
        """.stripMargin)
 
-        // test with optimized sql merge enabled / disabled.
-        spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+          // test with optimized sql merge enabled / disabled.
+          spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
 
-        // First merge with a extra input field 'flag' (insert a new record)
-        spark.sql(
-          s"""
-             | merge into $tableName
-             | using (
-             |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as 
flag
-             | ) s0
-             | on s0.id = $tableName.id
-             | when matched and flag = '1' then update set
-             | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
-             | when not matched and flag = '1' then insert *
+          val structFields = List(
+            StructField("id", IntegerType, nullable = true),
+            StructField("name", StringType, nullable = true),
+            StructField("price", DoubleType, nullable = true),
+            StructField("ts", IntegerType, nullable = true),
+            StructField("partition", StringType, nullable = true))
+          // First merge with a extra input field 'flag' (insert a new record)
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 'p1' 
as partition, '1' as flag
+               |  union
+               |  select 2 as id, 'a2' as name, 20 as price, 1000 as ts, 'p2' 
as partition, '1' as flag
+               |  union
+               |  select 3 as id, 'a3' as name, 30 as price, 1000 as ts, 'p3' 
as partition, '1' as flag
+               | ) s0
+               | on s0.id = $tableName.id
+               | when matched and flag = '1' then update set
+               | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+               | when not matched and flag = '1' then insert *
        """.stripMargin)
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 10.0, 1000)
-        )
+          validateTableSchema(tableName, structFields)
+          checkAnswer(s"select id, name, price, ts, partition from 
$tableName")(
+            Seq(1, "a1", 10.0, 1000, "p1"),
+            Seq(2, "a2", 20.0, 1000, "p2"),
+            Seq(3, "a3", 30.0, 1000, "p3")
+          )
 
-        // Second merge (update the record)
-        spark.sql(
-          s"""
-             | merge into $tableName
-             | using (
-             |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
-             | ) s0
-             | on s0.id = $tableName.id
-             | when matched then update set
-             | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
-             | when not matched then insert *
-       """.stripMargin)
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 20.0, 1001)
-        )
+          // Second merge (update the record) with different field names in 
the source
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select 1 as _id, 'a1' as name, 10 as _price, 1001 as _ts, 
'p1' as partition
+               | ) s0
+               | on s0._id = $tableName.id
+               | when matched then update set
+               | id = s0._id, name = s0.name, price = s0._price + 
$tableName.price, ts = s0._ts
+               | """.stripMargin)
+          validateTableSchema(tableName, structFields)
+          checkAnswer(s"select id, name, price, ts, partition from 
$tableName")(
+            Seq(1, "a1", 20.0, 1001, "p1"),
+            Seq(2, "a2", 20.0, 1000, "p2"),
+            Seq(3, "a3", 30.0, 1000, "p3")
+          )
 
-        // the third time merge (update & insert the record)
-        spark.sql(
-          s"""
-             | merge into $tableName
-             | using (
-             |  select * from (
-             |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
-             |  union all
-             |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
-             |  )
-             | ) s0
-             | on s0.id = $tableName.id
-             | when matched then update set
-             | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
-             | when not matched and s0.id % 2 = 0 then insert *
+          // the third time merge (update & insert the record)
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select * from (
+               |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts, 'p1' 
as partition
+               |  union all
+               |  select 4 as id, 'a4' as name, 40 as price, 1001 as ts, 'p4' 
as partition
+               |  )
+               | ) s0
+               | on s0.id = $tableName.id
+               | when matched then update set
+               | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
+               | when not matched and s0.id % 2 = 0 then insert *
        """.stripMargin)
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 30.0, 1002),
-          Seq(2, "a2", 12.0, 1001)
-        )
+          validateTableSchema(tableName, structFields)
+          checkAnswer(s"select id, name, price, ts, partition from 
$tableName")(
+            Seq(1, "a1", 30.0, 1002, "p1"),
+            Seq(2, "a2", 20.0, 1000, "p2"),
+            Seq(3, "a3", 30.0, 1000, "p3"),
+            Seq(4, "a4", 40.0, 1001, "p4")
+          )
 
-        // the fourth merge (delete the record)
-        spark.sql(
-          s"""
-             | merge into $tableName
-             | using (
-             |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
-             | ) s0
-             | on s0.id = $tableName.id
-             | when matched and s0.id != 1 then update set
-             |    id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
-             | when matched and s0.id = 1 then delete
-             | when not matched then insert *
+          // Validate that MERGE INTO only scan affected partitions in the 
target table
+          // Corrupt the files in other partitions not receiving updates
+          val (metaClient, fsv) = 
HoodieSparkSqlTestBase.getMetaClientAndFileSystemView(tmp.getCanonicalPath)
+          Seq("p2", "p3", "p4").map(e => "partition=" + e).foreach(partition 
=> {
+            assertTrue(fsv.getLatestFileSlices(partition).count() > 0)
+            
fsv.getLatestFileSlices(partition).iterator().asScala.foreach(fileSlice => {
+              if (fileSlice.getBaseFile.isPresent) {
+                HoodieSparkSqlTestBase.replaceWithEmptyFile(
+                  metaClient.getStorage, 
fileSlice.getBaseFile.get.getStoragePath)
+              }
+              fileSlice.getLogFiles.iterator().asScala.foreach(logFile => {
+                
HoodieSparkSqlTestBase.replaceWithEmptyFile(metaClient.getStorage, 
logFile.getPath)
+              })
+            })
+          })
+          // the fourth merge (delete the record)
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts, 'p1' 
as partition
+               | ) s0
+               | on s0.id = $tableName.id
+               | when matched and s0.id != 1 then update set
+               |    id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+               | when matched and s0.id = 1 then delete
+               | when not matched then insert *
        """.stripMargin)
-        val cnt = spark.sql(s"select * from $tableName where id = 1").count()
-        assertResult(0)(cnt)
+          val cnt = spark.sql(s"select * from $tableName where partition = 
'p1'").count()
+          assertResult(0)(cnt)
+        }
       })
     }
   }

Reply via email to