This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e4cebe29d6 [HUDI-8486] Enforce data type match for required columns 
in Spark SQL MERGE INTO (#12798)
3e4cebe29d6 is described below

commit 3e4cebe29d6727f3e684af3ca2e0b7c18e921f69
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Thu Feb 27 06:41:58 2025 -0800

    [HUDI-8486] Enforce data type match for required columns in Spark SQL MERGE 
INTO (#12798)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/command/MergeIntoHoodieTableCommand.scala | 221 ++++--
 .../src/test/resources/sql-statements.sql          |  16 +-
 .../TestPartitionStatsIndexWithSql.scala           |   2 +-
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  59 +-
 .../hudi/common/TestTableColumnTypeMismatch.scala  | 816 +++++++++++++++++++++
 .../apache/spark/sql/hudi/ddl/TestAlterTable.scala |   3 +-
 .../spark/sql/hudi/dml/TestInsertTable.scala       |   2 +-
 .../sql/hudi/dml/TestMergeIntoLogOnlyTable.scala   |   4 +-
 .../spark/sql/hudi/dml/TestMergeIntoTable.scala    | 119 +--
 .../spark/sql/hudi/dml/TestMergeIntoTable2.scala   | 208 +++---
 .../TestMergeIntoTableWithNonRecordKeyField.scala  |   6 +-
 .../hudi/dml/TestMergeModeCommitTimeOrdering.scala |   3 +
 .../hudi/dml/TestMergeModeEventTimeOrdering.scala  |   5 +-
 .../hudi/dml/TestPartialUpdateForMergeInto.scala   |  30 +-
 14 files changed, 1202 insertions(+), 292 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 4588bb4893e..7f01bed85cd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -57,6 +57,19 @@ import java.util.Base64
 
 import scala.collection.JavaConverters._
 
+/**
+ * Exception thrown when field resolution fails during MERGE INTO validation
+ */
+class MergeIntoFieldResolutionException(message: String)
+  extends AnalysisException(s"MERGE INTO field resolution error: $message")
+
+/**
+ * Exception thrown when field type does not match between source and target 
table
+ * during MERGE INTO validation
+ */
+class MergeIntoFieldTypeMismatchException(message: String)
+  extends AnalysisException(s"MERGE INTO field type mismatch error: $message")
+
 /**
  * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
  *
@@ -172,7 +185,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
           //
           //       Which (in the current design) could result in a record key 
of the record being modified,
           //       which is not allowed.
-          if (!resolvesToSourceAttribute(expr)) {
+          if (!resolvesToSourceAttribute(mergeInto.sourceTable, expr)) {
             throw new AnalysisException("Only simple conditions of the form 
`t.id = s.id` are allowed on the " +
               s"primary-key and partition path column. Found `${attr.sql} = 
${expr.sql}`")
           }
@@ -241,36 +254,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   /**
    * Please check description for [[primaryKeyAttributeToConditionExpression]]
    */
-  private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, 
Expression)] = {
-    val resolver = sparkSession.sessionState.analyzer.resolver
+  private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, 
Expression)] =
     hoodieCatalogTable.preCombineKey.map { preCombineField =>
-      val targetPreCombineAttribute =
-        mergeInto.targetTable.output
-          .find { attr => resolver(attr.name, preCombineField) }
-          .get
-
-      // To find corresponding "precombine" attribute w/in the [[sourceTable]] 
we do
-      //    - Check if we can resolve the attribute w/in the source table as 
is; if unsuccessful, then
-      //    - Check if in any of the update actions, right-hand side of the 
assignment actually resolves
-      //    to it, in which case we will determine left-hand side expression 
as the value of "precombine"
-      //    attribute w/in the [[sourceTable]]
-      val sourceExpr = {
-        mergeInto.sourceTable.output.find(attr => resolver(attr.name, 
preCombineField)) match {
-          case Some(attr) => attr
-          case None =>
-            updatingActions.flatMap(_.assignments).collectFirst {
-              case Assignment(attr: AttributeReference, expr)
-                if resolver(attr.name, preCombineField) && 
resolvesToSourceAttribute(expr) => expr
-            } getOrElse {
-              throw new AnalysisException(s"Failed to resolve precombine field 
`${preCombineField}` w/in the source-table output")
-            }
-
-        }
-      }
-
-      (targetPreCombineAttribute, sourceExpr)
+      resolveFieldAssociationsBetweenSourceAndTarget(
+        sparkSession.sessionState.conf.resolver,
+        mergeInto.targetTable,
+        mergeInto.sourceTable,
+        Seq(preCombineField),
+        "precombine field",
+        updatingActions.flatMap(_.assignments)).head
     }
-  }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     this.sparkSession = sparkSession
@@ -708,16 +701,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     (projectedJoinedDataset.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
   }
 
-  private def resolvesToSourceAttribute(expr: Expression): Boolean = {
-    val sourceTableOutputSet = mergeInto.sourceTable.outputSet
-    expr match {
-      case attr: AttributeReference => sourceTableOutputSet.contains(attr)
-      case MatchCast(attr: AttributeReference, _, _, _) => 
sourceTableOutputSet.contains(attr)
-
-      case _ => false
-    }
-  }
-
   private def validateInsertingAssignmentExpression(expr: Expression): Unit = {
     val sourceTableOutput = mergeInto.sourceTable.output
     expr.collect { case br: BoundReference => br }
@@ -819,11 +802,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // Precombine field and record key field must be present in the assignment 
clause of all insert actions for event time ordering mode.
     // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
     // Please note we are relying on merge mode in the table config as writer 
merge mode is always "CUSTOM" for MIT.
-    if (RecordMergeMode.EVENT_TIME_ORDERING.name()
-      
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, 
Object]],
-        HoodieTableConfig.RECORD_MERGE_MODE))) {
+    if (isEventTimeOrdering(props)) {
       insertActions.foreach(action =>
-        hoodieCatalogTable.preCombineKey.foreach(
+        hoodieCatalogTable.preCombineKey.map(
           field => {
             validateTargetTableAttrExistsInAssignments(
               sparkSession.sessionState.conf.resolver,
@@ -834,15 +815,84 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
           }))
     }
     insertActions.foreach(action =>
-      hoodieCatalogTable.preCombineKey.foreach(
-      field => {
       validateTargetTableAttrExistsInAssignments(
         sparkSession.sessionState.conf.resolver,
         mergeInto.targetTable,
         hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
         "record key field",
-        action.assignments)
-      }))
+        action.assignments))
+
+    val insertAssignments = insertActions.flatMap(_.assignments)
+    checkSchemaMergeIntoCompatibility(insertAssignments, props)
+  }
+
+  private def isEventTimeOrdering(props: Map[String, String]) = {
+    RecordMergeMode.EVENT_TIME_ORDERING.name()
+      
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, 
Object]],
+        HoodieTableConfig.RECORD_MERGE_MODE))
+  }
+
+  /**
+    * Check the merge into schema compatibility between the target table and 
the source table.
+    * The merge into schema compatibility requires data type matching for the 
following fields:
+    * 1. Partition key
+    * 2. Primary key
+    * 3. Precombine key
+    *
+    * @param assignments the assignment clause of the insert/update statement 
for figuring out
+    *                    the mapping between the target table and the source 
table.
+    */
+  private def checkSchemaMergeIntoCompatibility(assignments: Seq[Assignment], 
props: Map[String, String]): Unit = {
+    if (assignments.nonEmpty) {
+      // Assert data type matching for partition key
+      hoodieCatalogTable.partitionFields.foreach {
+        partitionField => {
+          try {
+            val association = resolveFieldAssociationsBetweenSourceAndTarget(
+              sparkSession.sessionState.conf.resolver,
+              mergeInto.targetTable,
+              mergeInto.sourceTable,
+              Seq(partitionField),
+              "partition key",
+              assignments).head
+            validateDataTypes(association._1, association._2, "Partition key")
+          } catch {
+            // Only catch AnalysisException from 
resolveFieldAssociationsBetweenSourceAndTarget
+            case _: MergeIntoFieldResolutionException =>
+          }
+        }
+      }
+      val primaryAttributeAssociatedExpression: Array[(Attribute, Expression)] 
=
+        resolveFieldAssociationsBetweenSourceAndTarget(
+          sparkSession.sessionState.conf.resolver,
+          mergeInto.targetTable,
+          mergeInto.sourceTable,
+          hoodieCatalogTable.primaryKeys,
+          "primary key",
+          assignments).toArray
+      primaryAttributeAssociatedExpression.foreach { case (attr, expr) =>
+        validateDataTypes(attr, expr, "Primary key")
+      }
+      if (isEventTimeOrdering(props)) {
+        hoodieCatalogTable.preCombineKey.map {
+          preCombineField => {
+            try {
+              val association = resolveFieldAssociationsBetweenSourceAndTarget(
+                sparkSession.sessionState.conf.resolver,
+                mergeInto.targetTable,
+                mergeInto.sourceTable,
+                Seq(preCombineField),
+                "precombine field",
+                assignments).head
+              validateDataTypes(association._1, association._2, "Precombine 
field")
+            } catch {
+              // Only catch AnalysisException from 
resolveFieldAssociationsBetweenSourceAndTarget
+              case _: MergeIntoFieldResolutionException =>
+            }
+          }
+        }
+      }
+    }
   }
 
   private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: 
Map[String, String]): Unit = {
@@ -854,6 +904,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         s"The number of update assignments[${update.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
 
+    val updateAssignments = updateActions.flatMap(_.assignments)
+    checkSchemaMergeIntoCompatibility(updateAssignments, props)
+
     if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
       // For MOR table, the target table field cannot be the right-value in 
the update action.
       updateActions.foreach(update => {
@@ -924,26 +977,82 @@ object MergeIntoHoodieTableCommand {
                                                  fields: Seq[String],
                                                  fieldType: String,
                                                  assignments: 
Seq[Assignment]): Unit = {
-    // To find corresponding [[fieldType]] attribute w/in the [[assignments]] 
we do
-    //    - Check if target table itself has the attribute
-    //    - Check if in any of the assignment actions, whose right-hand side 
attribute
-    // resolves to the source attribute. For example,
-    //        WHEN MATCHED THEN UPDATE SET targetTable.attribute = <expr>
-    // the left-hand side of the assignment can be resolved to the target 
fields we are
-    // validating here.
     fields.foreach { field =>
       targetTable.output
         .find(attr => resolver(attr.name, field))
-        .getOrElse(throw new AnalysisException(s"Failed to resolve $fieldType 
`$field` in target table"))
+        .getOrElse(throw new MergeIntoFieldResolutionException(s"Failed to 
resolve $fieldType `$field` in target table"))
 
       if (!assignments.exists {
         case Assignment(attr: AttributeReference, _) if resolver(attr.name, 
field) => true
         case _ => false
       }) {
-        throw new AnalysisException(s"No matching assignment found for target 
table $fieldType `$field`")
+        throw new MergeIntoFieldResolutionException(s"No matching assignment 
found for target table $fieldType `$field`")
       }
     }
   }
+
+  /**
+   * Generic method to resolve field associations between target and source 
tables
+   *
+   * @param resolver The resolver to use
+   * @param targetTable The target table of the merge
+   * @param sourceTable The source table of the merge
+   * @param fields The fields from the target table whose association with the 
source to be resolved
+   * @param fieldType String describing the type of field (for error messages)
+   * @param assignments The assignments clause of the merge into used for 
resolving the association
+   * @return Sequence of resolved (target table attribute, source table 
expression)
+   * mapping for target [[fields]].
+   *
+   * @throws AnalysisException if a field cannot be resolved
+   */
+  def resolveFieldAssociationsBetweenSourceAndTarget(resolver: Resolver,
+                                                     targetTable: LogicalPlan,
+                                                     sourceTable: LogicalPlan,
+                                                     fields: Seq[String],
+                                                     fieldType: String,
+                                                     assignments: 
Seq[Assignment]
+                             ): Seq[(Attribute, Expression)] = {
+    fields.map { field =>
+      val targetAttribute = targetTable.output
+        .find(attr => resolver(attr.name, field))
+        .getOrElse(throw new MergeIntoFieldResolutionException(
+          s"Failed to resolve $fieldType `$field` in target table"))
+
+      val sourceExpr = sourceTable.output
+        .find(attr => resolver(attr.name, field))
+        .getOrElse {
+          assignments.collectFirst {
+            case Assignment(attr: AttributeReference, expr)
+              if resolver(attr.name, field) && 
resolvesToSourceAttribute(sourceTable, expr) => expr
+          }.getOrElse {
+            throw new MergeIntoFieldResolutionException(
+              s"Failed to resolve $fieldType `$field` w/in the source-table 
output")
+          }
+        }
+
+      (targetAttribute, sourceExpr)
+    }
+  }
+
+  def resolvesToSourceAttribute(sourceTable: LogicalPlan, expr: Expression): 
Boolean = {
+    val sourceTableOutputSet = sourceTable.outputSet
+    expr match {
+      case attr: AttributeReference => sourceTableOutputSet.contains(attr)
+      case MatchCast(attr: AttributeReference, _, _, _) => 
sourceTableOutputSet.contains(attr)
+
+      case _ => false
+    }
+  }
+
+  def validateDataTypes(attr: Attribute, expr: Expression, columnType: 
String): Unit = {
+    if (attr.dataType != expr.dataType) {
+      throw new MergeIntoFieldTypeMismatchException(
+        s"$columnType data type mismatch between source table and target 
table. " +
+          s"Target table uses ${attr.dataType} for column '${attr.name}', " +
+          s"source table uses ${expr.dataType} for '${expr.sql}'"
+      )
+    }
+  }
 }
 
 object PartialAssignmentMode extends Enumeration {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql 
b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
index 29fbb0ba745..4aacb206a59 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
 # CREATE TABLE
 
 create table h1 (
-  id bigint,
+  id int,
   name string,
   price double,
-  ts bigint
+  ts int
 ) using hudi
 options (
   type = '${tableType}',
@@ -79,10 +79,10 @@ location '${tmpDir}/h1';
 +----------+
 
 create table h1_p (
-  id bigint,
+  id int,
   name string,
   price double,
-  ts bigint,
+  ts int,
   dt string
 ) using hudi
 partitioned by (dt)
@@ -205,7 +205,7 @@ select id, name, price, ts, dt from h1_p order by id;
 
 merge into h1_p t0
 using (
-  select 5 as _id, 'a5' as _name, 10 as _price, 1000L as _ts, '2021-05-08' as 
dt
+  select 5 as _id, 'a5' as _name, 10 as _price, 1000 as _ts, '2021-05-08' as dt
 ) s0
 on s0._id = t0.id
 when matched then update set id = _id, name = _name, price = _price, ts = _ts, 
dt = s0.dt
@@ -224,11 +224,11 @@ select id, name, price, ts, dt from h1_p order by id;
 
 merge into h1_p t0
 using (
-  select 1 as id, '_delete' as name, 10 as price, 1000L as ts, '2021-05-07' as 
dt
+  select 1 as id, '_delete' as name, 10 as price, 1000 as ts, '2021-05-07' as 
dt
   union
-  select 2 as id, '_update' as name, 12 as price, 1001L as ts, '2021-05-07' as 
dt
+  select 2 as id, '_update' as name, 12 as price, 1001 as ts, '2021-05-07' as 
dt
   union
-  select 6 as id, '_insert' as name, 10 as price, 1000L as ts, '2021-05-08' as 
dt
+  select 6 as id, '_insert' as name, 10 as price, 1000 as ts, '2021-05-08' as 
dt
 ) s0
 on s0.id = t0.id
 when matched and s0.name = '_update'
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 4436c907bac..8581af14234 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -161,7 +161,7 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
         spark.sql(
           s"""
              |merge into $tableName h0
-             |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'$partitionValue' as dt) s0
+             |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
cast('$partitionValue' as Date) as dt) s0
              |on h0.id = s0.id
              |when matched then update set *
              |""".stripMargin)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index d29b1c24c1e..74826e9cfaa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -19,15 +19,21 @@ package org.apache.spark.sql.hudi.common
 
 import org.apache.hudi.{DefaultSparkRecordMerger, HoodieSparkUtils}
 import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
-import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
+import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieStorageConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, 
HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.log.HoodieLogFileReader
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.storage.HoodieStorage
 import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, 
getSparkConfForTest}
 
@@ -39,13 +45,13 @@ import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.util.Utils
 import org.joda.time.DateTimeZone
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.scalactic.source
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
 import org.slf4j.LoggerFactory
 
 import java.io.File
-import java.util.TimeZone
+import java.util.{Collections, Optional, TimeZone}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.regex.Pattern
 
@@ -386,6 +392,49 @@ object HoodieSparkSqlTestBase {
       .getActiveTimeline.getInstantDetails(cleanInstant).get)
   }
 
+  def getMetaClientAndFileSystemView(basePath: String):
+  (HoodieTableMetaClient, SyncableFileSystemView) = {
+    val storageConf = HoodieTestUtils.getDefaultStorageConf
+    val metaClient: HoodieTableMetaClient =
+      
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+    val metadataConfig = HoodieMetadataConfig.newBuilder.build
+    val engineContext = new HoodieLocalEngineContext(storageConf)
+    val viewManager: FileSystemViewManager = 
FileSystemViewManager.createViewManager(
+      engineContext, metadataConfig, 
FileSystemViewStorageConfig.newBuilder.build,
+      HoodieCommonConfig.newBuilder.build,
+      (_: HoodieTableMetaClient) => {
+        HoodieTableMetadata.create(
+          engineContext, metaClient.getStorage, metadataConfig, 
metaClient.getBasePath.toString)
+      }
+    )
+    val fsView: SyncableFileSystemView = 
viewManager.getFileSystemView(metaClient)
+    (metaClient, fsView)
+  }
+
+  def validateDeleteLogBlockPrecombineNullOrZero(basePath: String): Unit = {
+    val (metaClient, fsView) = getMetaClientAndFileSystemView(basePath)
+    val fileSlice: Optional[FileSlice] = 
fsView.getAllFileSlices("").findFirst()
+    assertTrue(fileSlice.isPresent)
+    val logFilePathList: java.util.List[String] = 
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get)
+    Collections.sort(logFilePathList)
+    var deleteLogBlockFound = false
+    val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+    for (i <- 0 until logFilePathList.size()) {
+      val logReader = new HoodieLogFileReader(
+        metaClient.getStorage, new HoodieLogFile(logFilePathList.get(i)),
+        avroSchema, 1024 * 1024, false, false,
+        "id", null)
+      assertTrue(logReader.hasNext)
+      val logBlock = logReader.next()
+      if (logBlock.isInstanceOf[HoodieDeleteBlock]) {
+        val deleteLogBlock = logBlock.asInstanceOf[HoodieDeleteBlock]
+        assertTrue(deleteLogBlock.getRecordsToDelete.forall(i => 
i.getOrderingValue() == 0 || i.getOrderingValue() == null))
+        deleteLogBlockFound = true
+      }
+    }
+    assertTrue(deleteLogBlockFound)
+  }
+
   def validateTableConfig(storage: HoodieStorage,
                           basePath: String,
                           expectedConfigs: Map[String, String],
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestTableColumnTypeMismatch.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestTableColumnTypeMismatch.scala
new file mode 100644
index 00000000000..b9ecf2494ce
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestTableColumnTypeMismatch.scala
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.{DataSourceWriteOptions, ScalaAssertionSupport}
+
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.hudi.ErrorMessageChecker.isIncompatibleDataException
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestTableColumnTypeMismatch extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
+
+  test("Test Spark successful implicit type casting behaviors") {
+    withTempDir { tmp =>
+      // Define test cases for successful implicit casting
+      case class TypeCastTestCase(
+          sourceType: String,
+          targetType: String,
+          testValue: String,
+          expectedValue: Any,
+          description: String
+      )
+
+      val successfulTestCases = Seq(
+        // Numeric widening conversions (always safe)
+        TypeCastTestCase("tinyint", "smallint", "127", 127, "tinyint to 
smallint widening"),
+        TypeCastTestCase("tinyint", "int", "127", 127, "tinyint to int 
widening"),
+        TypeCastTestCase("tinyint", "bigint", "127", 127L, "tinyint to bigint 
widening"),
+        TypeCastTestCase("tinyint", "float", "127", 127.0f, "tinyint to float 
widening"),
+        TypeCastTestCase("tinyint", "double", "127", 127.0d, "tinyint to 
double widening"),
+        TypeCastTestCase("tinyint", "decimal(10,1)", "127", 
java.math.BigDecimal.valueOf(127.0), "tinyint to decimal widening"),
+
+        TypeCastTestCase("smallint", "int", "32767", 32767, "smallint to int 
widening"),
+        TypeCastTestCase("smallint", "bigint", "32767", 32767L, "smallint to 
bigint widening"),
+        TypeCastTestCase("smallint", "float", "32767", 32767.0f, "smallint to 
float widening"),
+        TypeCastTestCase("smallint", "double", "32767", 32767.0d, "smallint to 
double widening"),
+        TypeCastTestCase("smallint", "decimal(10,1)", "32767", 
java.math.BigDecimal.valueOf(32767.0), "smallint to decimal widening"),
+
+        TypeCastTestCase("int", "bigint", "2147483647", 2147483647L, "int to 
bigint widening"),
+        TypeCastTestCase("int", "float", "2147483647", 2147483647.0f, "int to 
float widening"),
+        TypeCastTestCase("int", "double", "2147483647", 2147483647.0d, "int to 
double widening"),
+        TypeCastTestCase("int", "decimal(10,1)", "22", 
java.math.BigDecimal.valueOf(22.0), "int to decimal widening"),
+
+        // double value would have some epsilon error which is expected.
+        TypeCastTestCase("float", "double", "3.14", 3.140000104904175d, "float 
to double widening"),
+        TypeCastTestCase("float", "decimal(10,2)", "3.14", 
java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP), 
"float to decimal"),
+
+        // Numeric narrowing conversions (potential data loss)
+        TypeCastTestCase("double", "int", "123.45", 123, "double to int - 
truncates decimal"),
+        TypeCastTestCase("decimal(10,2)", "int", "123.45", 123, "decimal to 
int - truncates decimal"),
+
+        // Boolean conversions
+        TypeCastTestCase("boolean", "string", "true", "true", "boolean to 
string"),
+
+        // Timestamp/Date conversions
+        TypeCastTestCase("timestamp", "string", "timestamp'2023-01-01 
12:00:00'", "2023-01-01 12:00:00", "timestamp to string"),
+        TypeCastTestCase("timestamp", "date", "timestamp'2023-01-01 
12:00:00'", java.sql.Date.valueOf("2023-01-01"), "timestamp to date"),
+        TypeCastTestCase("date", "string", "date'2023-01-01'", "2023-01-01", 
"date to string"),
+        TypeCastTestCase("date", "timestamp", "date'2023-01-01'", 
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date to timestamp")
+      )
+
+      val tableName = generateTableName
+
+      // Create columns definition dynamically
+      val columnsDefinition = successfulTestCases.zipWithIndex.map { case 
(test, idx) =>
+        s"col_${idx} ${test.targetType}"
+      }.mkString(",\n  ")
+
+      // Create single table with all target type columns
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  $columnsDefinition,
+           |  ts long
+           |) using hudi
+           |location '${tmp.getCanonicalPath}/$tableName'
+           |tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           |)
+         """.stripMargin)
+
+      // Generate insert values
+      val insertValues = successfulTestCases.zipWithIndex.map { case (test, 
idx) =>
+        s"cast(${test.testValue} as ${test.sourceType}) as col_${idx}"
+      }.mkString(",\n  ")
+
+      // Insert all test values in one go
+      spark.sql(
+        s"""
+           |insert into $tableName
+           |select
+           |  1 as id,
+           |  $insertValues,
+           |  1000 as ts
+         """.stripMargin)
+
+      // Verify each column value
+      val result = spark.sql("select "
+        + successfulTestCases.zipWithIndex.map { case (_, idx) => s"col_$idx" 
}.mkString(",\n  ")
+        + s" from $tableName where id = 1").collect()(0)
+      successfulTestCases.zipWithIndex.foreach { case (test, idx) =>
+        assert(result(idx) == test.expectedValue,
+          s"${test.description}: Expected ${test.expectedValue} but got 
${result(idx)}")
+      }
+    }
+  }
+
+  test("Test Spark disallowed implicit type casting behaviors") {
+    // Capturing the current behavior of Spark's implicit type casting.
+    withTempDir { tmp =>
+      // Define test cases for implicit casting
+      case class TypeCastTestCase(sourceType: String,
+                                  targetType: String,
+                                  testValue: String, // SQL literal expression
+                                  expectedValue: Any,
+                                  description: String = "")
+
+      val testCases = Seq(
+        TypeCastTestCase("int", "decimal(10,1)", "2147483647", 
java.math.BigDecimal.valueOf(2147483647.0), "int to decimal widening overflow"),
+
+        // String conversions
+        TypeCastTestCase("string", "int", "'123'", 123, "string to int - 
invalid numeric string"),
+        TypeCastTestCase("string", "double", "'12.34'", 12.34d, "string to 
double - invalid numeric string"),
+        TypeCastTestCase("string", "double", "'abc'", null, "string to double 
- invalid numeric string"),
+        TypeCastTestCase("string", "boolean", "'abc'", null, "string to 
boolean - invalid boolean string"),
+        TypeCastTestCase("string", "timestamp", "'2023-01-01'", 
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "string to timestamp - 
invalid date string"),
+        TypeCastTestCase("string", "date", "'2023-01-01'", 
java.sql.Date.valueOf("2023-01-01"), "string to date - invalid date string"),
+
+        // Numeric narrowing conversions (potential data loss)
+        TypeCastTestCase("double", "int", s"${Int.MaxValue.toDouble + 1}", 
null, "double to int - overflow"),
+        TypeCastTestCase("bigint", "int", "2147483648", null, "bigint to int - 
overflow"),
+
+        // Boolean conversions
+        TypeCastTestCase("boolean", "int", "true", 1, "boolean to int")
+      )
+
+      testCases.foreach { testCase =>
+        val tableName = generateTableName
+
+        // Create table with target type
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  value ${testCase.targetType},
+             |  ts long
+             |) using hudi
+             |location '${tmp.getCanonicalPath}/$tableName'
+             |tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             |)
+         """.stripMargin)
+
+        // Test failed conversion
+        val exception = intercept[Exception] {
+          spark.sql(
+            s"""
+               |insert into $tableName
+               |select 1 as id, cast(${testCase.testValue} as 
${testCase.sourceType}) as value, 1000 as ts
+           """.stripMargin)
+        }
+
+        val exceptionMsg = exception.getMessage
+        val exceptionCauseMsg = 
Option(exception.getCause).map(_.getMessage).getOrElse("")
+        assert(isIncompatibleDataException(exception),
+          s"${testCase.description}: Expected casting related error but got 
different exception: " +
+            s"Message from the exception ${exceptionMsg}, message from the 
exception cause ${exceptionCauseMsg}")
+      }
+    }
+  }
+
+  test("Test All Valid Type Casting For Merge Into and Insert") {
+    // For all valid type casting pairs, test merge into and insert operations.
+    // Define the column types for testing, based on successful casting cases
+    case class ColumnTypePair(sourceType: String,
+                              targetType: String,
+                              testValue: String,
+                              expectedValue: Any,
+                              columnName: String)
+
+    // Define valid type casting pairs based on the previous test cases
+    val validTypePairs = Seq(
+      // Numeric widening pairs
+      ColumnTypePair("tinyint", "smallint", "127", 127, "tiny_to_small"),
+      ColumnTypePair("tinyint", "int", "127", 127, "tiny_to_int"),
+      ColumnTypePair("tinyint", "bigint", "127", 127L, "tiny_to_big"),
+      ColumnTypePair("tinyint", "float", "127", 127.0f, "tiny_to_float"),
+      ColumnTypePair("tinyint", "double", "127", 127.0d, "tiny_to_double"),
+      ColumnTypePair("tinyint", "decimal(10,1)", "127", 
java.math.BigDecimal.valueOf(127.0), "tiny_to_decimal"),
+
+      ColumnTypePair("smallint", "int", "32767", 32767, "small_to_int"),
+      ColumnTypePair("smallint", "bigint", "32767", 32767L, "small_to_big"),
+      ColumnTypePair("smallint", "float", "32767", 32767.0f, "small_to_float"),
+      ColumnTypePair("smallint", "double", "32767", 32767.0d, 
"small_to_double"),
+      ColumnTypePair("smallint", "decimal(10,1)", "32767", 
java.math.BigDecimal.valueOf(32767.0), "small_to_decimal"),
+
+      ColumnTypePair("int", "bigint", "2147483647", 2147483647L, "int_to_big"),
+      ColumnTypePair("int", "float", "2147483647", 2147483647.0f, 
"int_to_float"),
+      ColumnTypePair("int", "double", "2147483647", 2147483647.0d, 
"int_to_double"),
+      ColumnTypePair("int", "decimal(10,1)", "22", 
java.math.BigDecimal.valueOf(22.0), "int_to_decimal"),
+
+      ColumnTypePair("float", "double", "3.14", 3.140000104904175d, 
"float_to_double"),
+      ColumnTypePair("float", "decimal(10,2)", "3.14", 
java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP), 
"float_to_decimal"),
+
+      // Timestamp/Date conversions
+      ColumnTypePair("timestamp", "string", "timestamp'2023-01-01 12:00:00'", 
"2023-01-01 12:00:00", "ts_to_string"),
+      ColumnTypePair("timestamp", "date", "timestamp'2023-01-01 12:00:00'", 
java.sql.Date.valueOf("2023-01-01"), "ts_to_date"),
+      ColumnTypePair("date", "string", "date'2023-01-01'", "2023-01-01", 
"date_to_string"),
+      ColumnTypePair("date", "timestamp", "date'2023-01-01'", 
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date_to_ts"),
+
+      // Boolean conversions
+      ColumnTypePair("boolean", "string", "true", "true", "bool_to_string")
+    )
+
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val targetTable = generateTableName
+        val sourceTable = generateTableName
+
+        // Create column definitions for both tables
+        val targetColumns = validTypePairs.map(p => s"${p.columnName} 
${p.targetType}").mkString(",\n  ")
+        val sourceColumns = validTypePairs.map(p => s"${p.columnName} 
${p.sourceType}").mkString(",\n  ")
+
+        // Create target table.
+        spark.sql(
+          s"""
+             |create table $targetTable (
+             |  id int,
+             |  $targetColumns,
+             |  ts long
+             |) using hudi
+             |location '${tmp.getCanonicalPath}/$targetTable'
+             |tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             |)
+         """.stripMargin)
+
+        // Create source table
+        spark.sql(
+          s"""
+             |create table $sourceTable (
+             |  id int,
+             |  $sourceColumns,
+             |  ts long
+             |) using hudi
+             |location '${tmp.getCanonicalPath}/$sourceTable'
+             |tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             |)
+         """.stripMargin)
+
+        // Insert initial data into target table
+        val targetInsertValues = validTypePairs.map(_ => "null").mkString(", ")
+        spark.sql(
+          s"""
+             |insert into $targetTable
+             |select 1 as id, $targetInsertValues, 1000 as ts
+         """.stripMargin)
+
+        // Insert data into source table with test values
+        val sourceValues = validTypePairs.map(p => s"cast(${p.testValue} as 
${p.sourceType})").mkString(", ")
+        spark.sql(
+          s"""
+             |insert into $sourceTable
+             |select 1 as id, $sourceValues, 1001 as ts
+         """.stripMargin)
+
+        // Perform merge operation
+        spark.sql(
+          s"""
+             |merge into $targetTable t
+             |using $sourceTable s
+             |on t.id = s.id
+             |when matched then update set *
+             |when not matched then insert *
+         """.stripMargin)
+
+        // Verify results
+        val c = validTypePairs.map(p => s"${p.columnName}").mkString(",\n  ")
+        val result = spark.sql(s"select $c from $targetTable where id = 
1").collect()(0)
+        validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
+          val actualValue = result.get(idx) // +1 because id is first column
+          assert(actualValue == pair.expectedValue,
+            s"${tableType.toUpperCase}: Column ${pair.columnName} - Expected 
${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue 
(${if (actualValue != null) actualValue.getClass else "null"})")
+        }
+
+        // Test insert case
+        val sourceValues2 = validTypePairs.map(p => s"cast(${p.testValue} as 
${p.sourceType})").mkString(", ")
+        spark.sql(
+          s"""
+             |insert into $sourceTable
+             |select 2 as id, $sourceValues2, 1002 as ts
+         """.stripMargin)
+
+        spark.sql(
+          s"""
+             |merge into $targetTable t
+             |using $sourceTable s
+             |on t.id = s.id
+             |when matched then update set *
+             |when not matched then insert *
+         """.stripMargin)
+        // Verify inserted row
+        val result2 = spark.sql(s"select * from $targetTable where id = 
2").collect()(0)
+        validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
+          val actualValue = result2.get(idx + 1)
+          assert(actualValue != pair.expectedValue,
+            s"${tableType.toUpperCase}: Insert - Column ${pair.columnName} - 
Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got 
$actualValue (${if (actualValue != null) actualValue.getClass else "null"})")
+        }
+      }
+    }
+  }
+
+  test("Test Column Type Mismatches for MergeInto Delete Actions") {
+    Seq("mor").foreach { tableType =>
+      withTempDir { tmp =>
+        def createTargetTable(partitionCol: String, partitionType: String): 
String = {
+          val targetTable = generateTableName
+          spark.sql(
+            s"""
+               |create table $targetTable (
+               |  id long,
+               |  name string,
+               |  value_double double,
+               |  ts long,
+               |  $partitionCol $partitionType
+               |) using hudi
+               |partitioned by ($partitionCol)
+               |location '${tmp.getCanonicalPath}/$targetTable'
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               |)
+         """.stripMargin)
+          targetTable
+        }
+
+        // Scenario 1: Successful merge with partition column (both partition 
and pk can be cast)
+        {
+          val targetTable = createTargetTable("part_col", "long")
+
+          // Insert initial data into target table
+          spark.sql(
+            s"""
+               |insert into $targetTable
+               |select
+               |  cast(id as long) as id,
+               |  name,
+               |  value_double,
+               |  ts,
+               |  cast(part_col as long) as part_col
+               |from (
+               |  select 1 as id, 'record1' as name, 1.1 as value_double, 1000 
as ts, 100 as part_col
+               |  union all
+               |  select 2 as id, 'record2' as name, 2.2 as value_double, 1000 
as ts, 200 as part_col
+               |)
+         """.stripMargin)
+
+          // Merge using inline subquery instead of source table
+          spark.sql(
+            s"""
+               |merge into $targetTable t
+               |using (
+               |  select
+               |    cast(1 as int) as id,
+               |    cast('updated1' as string) as name,
+               |    cast(1.11 as double) as value_double,
+               |    cast(1001 as long) as ts,
+               |    cast(100 as int) as part_col,
+               |    cast('Y' as string) as delete_flag
+               |) s
+               |on t.id = s.id and t.part_col = s.part_col
+               |when matched and s.delete_flag = 'Y' then delete
+         """.stripMargin)
+
+          checkAnswer(s"select id, name, value_double, ts, part_col from 
$targetTable order by id")(
+            Seq(2L, "record2", 2.2, 1000L, 200L))
+        }
+
+        // Scenario 2: Partition column type not cast-able
+        {
+          val targetTable = createTargetTable("part_col", "boolean")
+
+          // Insert initial data into target table with boolean partition
+          spark.sql(
+            s"""
+               |insert into $targetTable
+               |select
+               |  cast(id as long) as id,
+               |  name,
+               |  value_double,
+               |  ts,
+               |  true as part_col
+               |from (
+               |  select 1 as id, 'record1' as name, 1.1 as value_double, 1000 
as ts
+               |)
+         """.stripMargin)
+
+          val sourceTableSubQuery =
+            s"""
+               |  select
+               |    cast(1 as int) as id,
+               |    cast('updated1' as string) as name,
+               |    cast(1.11 as double) as value_double,
+               |    cast(1001 as long) as ts,
+               |    cast('2024-01-01' as date) as part_col,
+               |    cast('Y' as string) as delete_flag
+               |""".stripMargin
+          // Should fail with cast related error due to incompatible partition 
types
+          val e1 = intercept[Exception] {
+            spark.sql(
+              s"""
+                 |merge into $targetTable t
+                 |using ($sourceTableSubQuery) s
+                 |on t.id = s.id and t.part_col = s.part_col
+                 |when matched and s.delete_flag = 'Y' then delete
+           """.stripMargin)
+          }
+          assert(
+            e1.getMessage.contains(
+              "the left and right operands of the binary operator have 
incompatible types " +
+                "(\"BOOLEAN\" and \"DATE\")")
+            || e1.getMessage.contains(
+              "cannot resolve '(t.part_col = s.part_col)' due to data type 
mismatch: differing types" +
+                " in '(t.part_col = s.part_col)' (boolean and date)."))
+
+          spark.sql(
+            s"""
+               |merge into $targetTable t
+               |using ($sourceTableSubQuery) s
+               |on t.id = s.id
+               |when matched and s.delete_flag = 'Y' then delete
+           """.stripMargin)
+
+          checkAnswer(s"select id, name, value_double, ts, part_col from 
$targetTable order by id")(
+            Seq(1L, "record1", 1.1, 1000L, true))
+        }
+
+        // Scenario 4: Failed merge due to primary key type mismatch
+        {
+          val targetTable = createTargetTable("part_col", "long")
+
+          // Insert initial data
+          spark.sql(
+            s"""
+               |insert into $targetTable
+               |select
+               |  cast(id as long) as id,
+               |  name,
+               |  value_double,
+               |  ts,
+               |  part_col
+               |from (
+               |  select 1 as id, 'record1' as name, 1.1 as value_double, 1000 
as ts, 100 as part_col
+               |)
+         """.stripMargin)
+
+          val e2 = intercept[Exception] {
+            spark.sql(
+              s"""
+                 |merge into $targetTable t
+                 |using (
+                 |  select
+                 |    cast(1.0 as double) as id,
+                 |    cast('updated1' as string) as name,
+                 |    cast(1.11 as double) as value_double,
+                 |    cast(1001 as long) as ts,
+                 |    cast(100 as long) as part_col,
+                 |    cast('Y' as string) as delete_flag
+                 |) s
+                 |on t.id = s.id
+                 |when matched and s.delete_flag = 'Y' then delete
+           """.stripMargin)
+          }
+          assert(e2.getMessage.contains("Invalid MERGE INTO matching 
condition: s.id: can't cast s.id (of DoubleType) to LongType"))
+        }
+      }
+    }
+  }
+
+  test("Test Column Type Mismatches for MergeInto Insert and Update Actions") {
+    // Define test cases
+    case class TypeMismatchTestCase(
+                                     description: String,
+                                     targetSchema: Seq[(String, String)], // 
(colName, colType)
+                                     sourceSchema: Seq[(String, String)],
+                                     partitionCols: Seq[String],
+                                     primaryKey: String,
+                                     preCombineField: String,
+                                     tableType: String, // COW or MOR
+                                     expectedErrorPattern: String
+                                   )
+
+    val testCases = Seq(
+      TypeMismatchTestCase(
+        description = "Partition column type mismatch",
+        targetSchema = Seq(
+          "id" -> "int",
+          "name" -> "string",
+          "price" -> "int",
+          "ts" -> "long"
+        ),
+        sourceSchema = Seq(
+          "id" -> "int",
+          "name" -> "int", // mismatched type
+          "price" -> "int",
+          "ts" -> "long"
+        ),
+        partitionCols = Seq("name", "price"),
+        primaryKey = "id",
+        preCombineField = "ts",
+        tableType = "cow",
+        expectedErrorPattern = "Partition key data type mismatch between 
source table and target table. Target table uses StringType for column 'name', 
source table uses IntegerType for 's0.name'"
+      ),
+      TypeMismatchTestCase(
+        description = "Primary key type mismatch",
+        targetSchema = Seq(
+          "id" -> "int",
+          "name" -> "string",
+          "price" -> "int",
+          "ts" -> "long"
+        ),
+        sourceSchema = Seq(
+          "id" -> "long", // mismatched type
+          "name" -> "string",
+          "price" -> "int",
+          "ts" -> "long"
+        ),
+        partitionCols = Seq("name", "price"),
+        primaryKey = "id",
+        preCombineField = "ts",
+        tableType = "mor",
+        expectedErrorPattern = "Primary key data type mismatch between source 
table and target table. Target table uses IntegerType for column 'id', source 
table uses LongType for 's0.id'"
+      ),
+      TypeMismatchTestCase(
+        description = "Precombine field type mismatch",
+        targetSchema = Seq(
+          "id" -> "int",
+          "name" -> "string",
+          "price" -> "int",
+          "ts" -> "long"
+        ),
+        sourceSchema = Seq(
+          "id" -> "int",
+          "name" -> "string",
+          "price" -> "int",
+          "ts" -> "int" // mismatched type
+        ),
+        partitionCols = Seq("name", "price"),
+        primaryKey = "id",
+        preCombineField = "ts",
+        tableType = "cow",
+        expectedErrorPattern = "Precombine field data type mismatch between 
source table and target table. Target table uses LongType for column 'ts', 
source table uses IntegerType for 's0.ts'"
+      )
+    )
+
+    def createTable(tableName: String, schema: Seq[(String, String)], 
partitionCols: Seq[String],
+                    primaryKey: String, preCombineField: String, tableType: 
String, location: String): Unit = {
+      val schemaStr = schema.map { case (name, dataType) => s"$name $dataType" 
}.mkString(",\n  ")
+      val partitionColsStr = if (partitionCols.nonEmpty) s"partitioned by 
(${partitionCols.mkString(", ")})" else ""
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  $schemaStr
+           |) using hudi
+           |$partitionColsStr
+           |location '$location'
+           |tblproperties (
+           |  type = '$tableType',
+           |  primaryKey = '$primaryKey',
+           |  preCombineField = '$preCombineField'
+           |)
+       """.stripMargin)
+    }
+
+    // Run test cases
+    testCases.foreach { testCase =>
+      
withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}"
 -> "false") {
+        withTempDir { tmp =>
+          val targetTable = generateTableName
+
+          // Create only target table
+          createTable(
+            targetTable,
+            testCase.targetSchema,
+            testCase.partitionCols,
+            testCase.primaryKey,
+            testCase.preCombineField,
+            testCase.tableType,
+            s"${tmp.getCanonicalPath}/$targetTable"
+          )
+
+          // Insert sample data into target table
+          spark.sql(
+            s"""
+               |insert into $targetTable
+               |select 1 as id, 'John Doe' as name, 19 as price, 1598886000 as 
ts
+               |union all
+               |select 2, 'Jane Doe', 24, 1598972400
+           """.stripMargin)
+
+          // Test UPDATE action with inline subquery
+          val updateQuery =
+            s"""
+               |merge into $targetTable t
+               |using (
+               |  select
+               |    cast(1 as ${testCase.sourceSchema.find(_._1 == 
"id").get._2}) as id,
+               |    cast('John Doe' as ${testCase.sourceSchema.find(_._1 == 
"name").get._2}) as name,
+               |    cast(20 as ${testCase.sourceSchema.find(_._1 == 
"price").get._2}) as price,
+               |    cast(1598886001 as ${testCase.sourceSchema.find(_._1 == 
"ts").get._2}) as ts
+               |) s0
+               |on t.${testCase.primaryKey} = s0.${testCase.primaryKey}
+               |when matched then update set *
+             """.stripMargin
+
+          val updateError = intercept[AnalysisException] {
+            spark.sql(updateQuery)
+          }.getMessage
+
+          assert(updateError.contains(testCase.expectedErrorPattern),
+            s"UPDATE - Expected error pattern 
'${testCase.expectedErrorPattern}' not found in actual error: $updateError")
+
+          // Test INSERT action with inline subquery
+          val insertQuery =
+            s"""
+               |merge into $targetTable t
+               |using (
+               |  select
+               |    cast(3 as ${testCase.sourceSchema.find(_._1 == 
"id").get._2}) as id,
+               |    cast('Bob Smith' as ${testCase.sourceSchema.find(_._1 == 
"name").get._2}) as name,
+               |    cast(30 as ${testCase.sourceSchema.find(_._1 == 
"price").get._2}) as price,
+               |    cast(1598886002 as ${testCase.sourceSchema.find(_._1 == 
"ts").get._2}) as ts
+               |) s0
+               |on t.${testCase.primaryKey} = s0.${testCase.primaryKey}
+               |when not matched then insert *
+             """.stripMargin
+
+          val insertError = intercept[AnalysisException] {
+            spark.sql(insertQuery)
+          }.getMessage
+
+          assert(insertError.contains(testCase.expectedErrorPattern),
+            s"INSERT - Expected error pattern 
'${testCase.expectedErrorPattern}' not found in actual error: $insertError")
+        }
+      }
+    }
+  }
+
+  test("Test MergeInto with partition column type mismatch should throw") {
+    
withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}"
 -> "false") {
+      withTempDir { tmp =>
+        val targetTable = generateTableName
+
+        // Create target table with string partition
+        spark.sql(
+          s"""
+             |create table $targetTable (
+             |  id int,
+             |  name long,
+             |  ts int
+             |) using hudi
+             |partitioned by (name)
+             |location '${tmp.getCanonicalPath}/$targetTable'
+             |tblproperties (
+             |  type = 'cow',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             |)
+         """.stripMargin)
+
+        // Insert sample data
+        spark.sql(
+          s"""
+             |insert into $targetTable
+             |select 1 as id, 124L as name, 1000 as ts
+       """.stripMargin)
+
+        val e = intercept[AnalysisException] {
+          spark.sql(
+            s"""
+               |merge into $targetTable t
+               |using (
+               |  select
+               |    cast(1 as int) as id,
+               |    cast(123 as int) as name,
+               |    cast(1001 as long) as ts
+               |) s
+               |on t.id = s.id
+               |when matched then update set name = s.name
+         """.stripMargin)
+        }
+        assert(e.getMessage.contains("data type mismatch between source table 
and target table"))
+      }
+    }
+  }
+
+  test("Test MergeInto with precombine column type mismatch behavior based on 
record.merge.mode") {
+    
withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}"
 -> "false") {
+      withTempDir { tmp =>
+        Seq("EVENT_TIME_ORDERING", "COMMIT_TIME_ORDERING").foreach { mergeMode 
=>
+          val targetTable = generateTableName
+
+          // Create target table with int ts
+          spark.sql(
+            s"""
+               |create table $targetTable (
+               |  id int,
+               |  name string,
+               |  ts int
+               |) using hudi
+               |partitioned by (name)
+               |location '${tmp.getCanonicalPath}/$targetTable'
+               |tblproperties (
+               |  type = 'cow',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  'hoodie.record.merge.mode' = '$mergeMode'
+               |)
+           """.stripMargin)
+
+          // Insert sample data
+          spark.sql(
+            s"""
+               |insert into $targetTable
+               |select 1 as id, 'John' as name, 1000 as ts
+         """.stripMargin)
+
+          if (mergeMode == "EVENT_TIME_ORDERING") {
+            // Should throw exception for EVENT_TIME_ORDERING
+            val e = intercept[AnalysisException] {
+              spark.sql(
+                s"""
+                   |merge into $targetTable t
+                   |using (
+                   |  select
+                   |    cast(1 as int) as id,
+                   |    cast('John' as string) as name,
+                   |    cast(1001 as long) as ts
+                   |) s
+                   |on t.id = s.id
+                   |when matched then update set ts = s.ts
+               """.stripMargin)
+            }
+            assert(e.getMessage.contains("data type mismatch between source 
table and target table"))
+          } else {
+            // Should succeed for COMMIT_TIME_ORDERING
+            spark.sql(
+              s"""
+                 |merge into $targetTable t
+                 |using (
+                 |  select
+                 |    cast(1 as int) as id,
+                 |    cast('John' as string) as name,
+                 |    cast(1001 as long) as ts
+                 |) s
+                 |on t.id = s.id
+                 |when matched then update set ts = s.ts
+             """.stripMargin)
+
+            // Verify the update succeeded
+            checkAnswer(s"select id, name, ts from $targetTable where id = 1")(
+              Seq(1, "John", 1001)
+            )
+          }
+        }
+      }
+    }
+  }
+}
+
+object ErrorMessageChecker {
+  private val incompatibleDataPatterns = Set(
+    "Cannot write incompatible data to table",
+    "overflow",
+    "cannot be cast",
+    "Cannot safely cast",
+    "Conversion of",
+    "Failed to parse",
+    "cannot be represented as Decimal"
+  )
+
+  def containsIncompatibleDataError(message: String): Boolean = {
+    incompatibleDataPatterns.exists(message.contains)
+  }
+
+  def isIncompatibleDataException(exception: Exception): Boolean = {
+    containsIncompatibleDataError(exception.getMessage) ||
+      Option(exception.getCause)
+        .exists(cause => containsIncompatibleDataError(cause.getMessage))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
index 6bd084411cd..d32cbbd435e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
@@ -26,7 +26,6 @@ import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeComm
 import org.apache.hudi.table.HoodieSparkTable
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
-import org.apache.hadoop.fs.Path
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -123,7 +122,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
           s"""
              |merge into $newTableName t0
              |using (
-             |  select 1 as id, 'a1' as name, 12 as price, 1001 as ts, 'e0' as 
ext0
+             |  select 1 as id, 'a1' as name, 12 as price, 1001L as ts, 'e0' 
as ext0
              |) s0
              |on t0.id = s0.id
              |when matched then update set *
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 3942af2b145..7c1bc0aa2a2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -214,7 +214,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         s"""
            |merge into ${targetTable} as target
            |using (
-           |select '2' as id, 'bb' as name, 456 as dt, '2024-02-19' as `day`, 
10 as `hour`
+           |select '2' as id, 'bb' as name, 456L as dt, '2024-02-19' as `day`, 
10 as `hour`
            |) as source
            |on target.id = source.id
            |when matched then update set *
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
index b8ab95e7653..1ecfa4e98ff 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
@@ -58,7 +58,7 @@ class TestMergeIntoLogOnlyTable extends 
HoodieSparkSqlTestBase {
         s"""
            |merge into $tableName h0
            |using (
-           | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
+           | select 1 as id, 'a1' as name, 11 as price, 1001L as ts
            | ) s0
            | on h0.id = s0.id
            | when matched then update set *
@@ -74,7 +74,7 @@ class TestMergeIntoLogOnlyTable extends 
HoodieSparkSqlTestBase {
         s"""
            |merge into $tableName h0
            |using (
-           | select 4 as id, 'a4' as name, 11 as price, 1000 as ts
+           | select 4 as id, 'a4' as name, 11 as price, 1000L as ts
            | ) s0
            | on h0.id = s0.id
            | when not matched then insert *
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index daf07b8e0f0..d2f16599db3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -47,7 +47,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts int
              |) using hudi
              | location '${tmp.getCanonicalPath}'
              | tblproperties (
@@ -63,7 +63,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSuppo
           StructField("id", IntegerType, nullable = true),
           StructField("name", StringType, nullable = true),
           StructField("price", DoubleType, nullable = true),
-          StructField("ts", LongType, nullable = true))
+          StructField("ts", IntegerType, nullable = true))
         // First merge with a extra input field 'flag' (insert a new record)
         spark.sql(
           s"""
@@ -155,7 +155,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  name string,
            |  data int,
            |  country string,
-           |  ts bigint
+           |  ts int
            |) using hudi
            |tblproperties (
            |  type = 'cow',
@@ -169,7 +169,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            |merge into $targetTable as target
            |using (
-           |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 
1646643193 as ts
+           |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 43193 
as ts
            |) source
            |on source.id = target.id
            |when matched then
@@ -181,7 +181,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            |merge into $targetTable as target
            |using (
-           |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 
1646643196 as ts
+           |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 43196 
as ts
            |) source
            |on source.id = target.id
            |when matched and source.data > target.data then
@@ -193,7 +193,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |""".stripMargin)
 
       checkAnswer(s"select id, name, data, country, ts from $targetTable")(
-        Seq(1, "lb", 5, "shu", 1646643196L)
+        Seq(1, "lb", 5, "shu", 43196)
       )
     }
   }
@@ -285,7 +285,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int
            | ) using parquet
            | location '${tmp.getCanonicalPath}/$sourceTable'
          """.stripMargin)
@@ -296,7 +296,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}/$targetTable'
            | tblproperties (
@@ -447,7 +447,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -462,7 +462,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         StructField("id", IntegerType, nullable = true),
         StructField("name", StringType, nullable = true),
         StructField("price", DoubleType, nullable = true),
-        StructField("ts", LongType, nullable = true),
+        StructField("ts", IntegerType, nullable = true),
         StructField("dt", StringType, nullable = true))
 
       // Insert data
@@ -470,7 +470,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as id, 'a1' as name, 10 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when not matched and s0.id % 2 = 1 then insert *
@@ -485,7 +485,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as _id, 'a1' as name, 12 as _price, 1001L as _ts, 
'2021-03-21' as dt
+           |  select 1 as _id, 'a1' as name, 12 as _price, 1001 as _ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0._id
            | when matched and s0._id % 2 = 0 then update set
@@ -501,7 +501,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as _id, 'a1' as name, 12 as _price, 1001L as _ts, 
'2021-03-21' as dt
+           |  select 1 as _id, 'a1' as name, 12 as _price, 1001 as _ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0._id
            | when matched and s0._id % 2 = 1 then update set
@@ -517,7 +517,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 2 as id, 'a2' as name, 10 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 2 as id, 'a2' as name, 10 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when not matched and s0.id % 2 = 0 then insert *
@@ -532,7 +532,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            | merge into $tableName t0
            | using (
-           |  select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001L as s_ts, 
'2021-03-21' as dt
+           |  select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, 
'2021-03-21' as dt
            | ) s0
            | on t0.id = s0.s_id
            | when matched and s_ts = 1001
@@ -552,7 +552,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            | merge into $tableName t0
            | using (
-           |  select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001L as s_ts, 
'2021-03-21' as dt
+           |  select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, 
'2021-03-21' as dt
            | ) s0
            | on t0.id = s0.s_id + 1
            | when matched and s_ts = 1001 then delete
@@ -563,7 +563,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         s"""
            | merge into $tableName t0
            | using (
-           |  select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001L as ts, 
'2021-03-21' as dt
+           |  select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001 as ts, 
'2021-03-21' as dt
            | ) s0
            | on t0.id = s0.s_id
            | when matched and s0.ts = 1001 then delete
@@ -584,7 +584,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       spark.sql(
         s"""
            | create table $tableName (
-           |  id bigint,
+           |  id int,
            |  name string,
            |  price double,
            |  dt string
@@ -644,7 +644,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  price double,
-             |  v long,
+             |  v int,
              |  dt string
              | ) using hudi
              | tblproperties (
@@ -718,7 +718,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  price double,
-             |  v long,
+             |  v int,
              |  dt string
              | ) using hudi
              | tblproperties (
@@ -754,8 +754,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         //
         // 2) set source column name to be different with target column
         //
-        val errorMessage = "Failed to resolve precombine field `v` w/in the 
source-table output"
-
         checkException(
           s"""
              | merge into $tableName1 as t0
@@ -765,7 +763,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              | on t0.id = s0.s_id
              | when matched then update set id=s0.s_id, name=s0.s_name, 
price=s0.s_price*2, v=s0.s_v+2, dt=s0.dt
          """.stripMargin
-        )(errorMessage)
+        )(
+          "MERGE INTO field resolution error: " +
+            "Failed to resolve precombine field `v` w/in the source-table 
output")
 
         spark.sql(
           s"""
@@ -795,7 +795,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  price double,
-           |  v long,
+           |  v int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -815,8 +815,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       // Delete data with a condition expression on primaryKey field
       // 1) set source column name to be same as target column
       //
-      val complexConditionsErrorMessage = "Only simple conditions of the form 
`t.id = s.id` are allowed on the primary-key and partition path column. Found 
`t0.id = (s0.id + 1)`"
-
       checkException(
         s"""merge into $tableName1 t0
            | using (
@@ -824,7 +822,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            | ) s0
            | on t0.id = s0.id + 1
            | when matched then delete
-       """.stripMargin)(complexConditionsErrorMessage)
+       """.stripMargin)(
+        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key "
+          + "and partition path column. Found `t0.id = (s0.id + 1)`")
 
       spark.sql(
         s"""
@@ -844,8 +844,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       //
       // 2.a) set source column name to be different with target column 
(should fail unable to match precombine field)
       //
-      val failedToResolveErrorMessage = "Failed to resolve precombine field 
`v` w/in the source-table output"
-
       checkException(
         s"""merge into $tableName1 t0
            | using (
@@ -853,7 +851,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            | ) s0
            | on t0.id = s0.s_id
            | when matched then delete
-           |""".stripMargin)(failedToResolveErrorMessage)
+           |""".stripMargin)(
+        "MERGE INTO field resolution error: "
+          + "Failed to resolve precombine field `v` w/in the source-table 
output")
 
       //
       // 2.b) set source column name to be different with target column
@@ -1079,7 +1079,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           s"""
              | merge into $tableName
              | using (
-             |  select 1 as id, 'a1' as name, 10 as price, $dataValue as c, 
'1' as flag
+             |  select 1 as id, 'a1' as name, 10 as price, cast($dataValue as 
$dataType) as c, '1' as flag
              | ) s0
              | on s0.id = $tableName.id
              | when matched and flag = '1' then update set *
@@ -1092,7 +1092,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           s"""
              | merge into $tableName
              | using (
-             |  select 1 as id, 'a1' as name, 10 as price, $dataValue as c
+             |  select 1 as id, 'a1' as name, 10 as price, cast($dataValue as 
$dataType) as c
              | ) s0
              | on s0.id = $tableName.id
              | when matched then update set
@@ -1117,7 +1117,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}'
            | tblproperties (
@@ -1131,7 +1131,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         StructField("id", IntegerType, nullable = true),
         StructField("name", StringType, nullable = true),
         StructField("price", DoubleType, nullable = true),
-        StructField("ts", LongType, nullable = true))
+        StructField("ts", IntegerType, nullable = true))
 
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
@@ -1188,7 +1188,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  value $dataType,
-             |  ts long
+             |  ts int
              |) using hudi
              | location '${tmp.getCanonicalPath}/$tableName'
              | tblproperties (
@@ -1235,7 +1235,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  value $dataType,
-             |  ts long
+             |  ts int
              |) using hudi
              | location '${tmp.getCanonicalPath}/$tableName'
              | tblproperties (
@@ -1269,7 +1269,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  value int,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
            | tblproperties (
@@ -1362,7 +1362,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
            |  id int,
            |  name string,
            |  value int,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
            | tblproperties (
@@ -1373,45 +1373,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
 
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
 
-      // Can't down-cast incoming dataset's primary-key w/o loss of precision 
(should fail)
-      val errorMsg = "Invalid MERGE INTO matching condition: s0.id: can't cast 
s0.id (of LongType) to IntegerType"
-
-      checkExceptionContain(
-        s"""
-           |merge into $tableName h0
-           |using (
-           |  select cast(1 as long) as id, 1001 as ts
-           | ) s0
-           | on cast(h0.id as long) = s0.id
-           | when matched then update set h0.ts = s0.ts
-           |""".stripMargin)(errorMsg)
-
-      // Can't down-cast incoming dataset's primary-key w/o loss of precision 
(should fail)
-      checkExceptionContain(
-        s"""
-           |merge into $tableName h0
-           |using (
-           |  select cast(1 as long) as id, 1002 as ts
-           | ) s0
-           | on h0.id = s0.id
-           | when matched then update set h0.ts = s0.ts
-           |""".stripMargin)(errorMsg)
-
-      // Can up-cast incoming dataset's primary-key w/o loss of precision 
(should succeed)
-      spark.sql(
-        s"""
-           |merge into $tableName h0
-           |using (
-           |  select cast(1 as short) as id, 1003 as ts
-           | ) s0
-           | on h0.id = s0.id
-           | when matched then update set h0.ts = s0.ts
-           |""".stripMargin)
-
-      checkAnswer(s"select id, name, value, ts from $tableName")(
-        Seq(1, "a1", 10, 1003)
-      )
-
       // Can remove redundant symmetrical casting on both sides (should 
succeed)
       spark.sql(
         s"""
@@ -1439,7 +1400,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         spark.sql(
           s"""
              | create table $tableName (
-             |  id bigint,
+             |  id int,
              |  name string,
              |  price double,
              |  dt string
@@ -1497,7 +1458,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           spark.sql(
             s"""
                | create table $tableName (
-               |  id bigint,
+               |  id int,
                |  name string,
                |  price double,
                |  ts bigint,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index bd8f7676e02..ea19f9812b7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -41,7 +41,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -57,7 +57,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as id, 'a1' as name, 10 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when not matched and s0.id % 2 = 1 then insert *
@@ -72,7 +72,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 2 as id, 'a2' as name, 10 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 2 as id, 'a2' as name, 10 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when not matched and s0.id % 2 = 1 then insert *
@@ -87,7 +87,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as id, 'a1' as name, 11 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 1 as id, 'a1' as name, 11 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when matched and s0.id % 2 = 0 then update set *
@@ -104,7 +104,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as id, 'a1' as name, 11 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 1 as id, 'a1' as name, 11 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when matched and s0.id % 2 = 1 then update set id = s0.id, name = 
s0.name,
@@ -121,7 +121,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as id, 'a1' as name, 11 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 1 as id, 'a1' as name, 11 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when matched and s0.id % 2 = 0 then update set id = s0.id, name = 
s0.name,
@@ -138,7 +138,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         s"""
            | merge into $tableName as t0
            | using (
-           |  select 1 as id, 'a1' as name, 10 as price, 1000L as ts, 
'2021-03-21' as dt
+           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 
'2021-03-21' as dt
            | ) as s0
            | on t0.id = s0.id
            | when matched and s0.id % 2 = 1 then update set id = s0.id, name = 
s0.name,
@@ -194,7 +194,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  s_value struct<f0: int, f1: string>,
            |  a_value array<string>,
            |  m_value map<string, string>,
-           |  ts long
+           |  ts int
            | ) using hudi
            | tblproperties (
            |  type = 'mor',
@@ -257,7 +257,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
@@ -345,7 +345,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
            | tblproperties (
@@ -390,7 +390,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
            | tblproperties (
@@ -457,7 +457,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  ID int,
            |  name string,
            |  price double,
-           |  TS long,
+           |  ts int,
            |  DT string
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
@@ -529,7 +529,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  ID int,
            |  NAME string,
            |  price double,
-           |  TS long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | options (
@@ -571,7 +571,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
@@ -618,7 +618,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id2 int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            |) using hudi
            | location '${tmp.getCanonicalPath}/$tableName'
@@ -664,7 +664,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -702,7 +702,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -742,7 +742,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            | create table $tableName (
            |  id int,
            |  name string,
-           |  ts long
+           |  ts int
            | ) using hudi
            | tblproperties (
            |  type = 'cow',
@@ -783,7 +783,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                |  id int,
                |  name string,
                |  price double,
-               |  ts long,
+               |  ts int,
                |  dt string
                | ) using hudi
                | tblproperties (
@@ -848,7 +848,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                |  id int,
                |  name string,
                |  price double,
-               |  ts long,
+               |  ts int,
                |  dt string
                | ) using hudi
                | tblproperties (
@@ -913,7 +913,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -962,7 +962,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -992,7 +992,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long,
+           |  ts int,
            |  dt string
            | ) using hudi
            | tblproperties (
@@ -1080,7 +1080,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                    |  dt = s0.dt
                    |when matched and s0.id = 2 then update set *
                """.stripMargin
-              )("No matching assignment found for target table record key 
field `id`")
+              )("MERGE INTO field resolution error: No matching assignment 
found for target table record key field `id`")
 
               checkException(
                 s"""
@@ -1095,7 +1095,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                    |  ts = s0.ts,
                    |  dt = s0.dt
                """.stripMargin
-              )("No matching assignment found for target table record key 
field `id`")
+              )("MERGE INTO field resolution error: No matching assignment 
found for target table record key field `id`")
             }
 
             // Test 2: At least one partial insert assignment clause misses 
primary key.
@@ -1110,7 +1110,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                  |values (s0.name, s0.price, s0.ts, s0.dt)
                  |when not matched and s0.id = 2 then insert *
                """.stripMargin
-            )("No matching assignment found for target table record key field 
`id`")
+            )("MERGE INTO field resolution error: No matching assignment found 
for target table record key field `id`")
 
             checkException(
               s"""
@@ -1122,7 +1122,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                  |when not matched then insert (name, price, ts, dt)
                  |values (s0.name, s0.price, s0.ts, s0.dt)
                """.stripMargin
-            )("No matching assignment found for target table record key field 
`id`")
+            )("MERGE INTO field resolution error: No matching assignment found 
for target table record key field `id`")
 
             // Test 3: Partial insert missing preCombineField - only validate 
for EVENT_TIME_ORDERING
             val mergeStmt =
@@ -1139,7 +1139,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
 
             if (mergeMode == "EVENT_TIME_ORDERING") {
               checkException(mergeStmt)(
-                "No matching assignment found for target table precombine 
field `ts`"
+                "MERGE INTO field resolution error: No matching assignment 
found for target table precombine field `ts`"
               )
             } else {
               // For COMMIT_TIME_ORDERING, this should execute without error
@@ -1168,7 +1168,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
     withRecordType()(withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode 
=>
-          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "false") {
+          
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
 -> "true") {
             val tableName = generateTableName
             spark.sql(
               s"""
@@ -1221,28 +1221,6 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase 
{
       withTempDir { tmp =>
         Seq(RecordMergeMode.COMMIT_TIME_ORDERING.name(),
           RecordMergeMode.EVENT_TIME_ORDERING.name()).foreach { 
recordMergeMode =>
-          val sourceTable = generateTableName
-          spark.sql(
-            s"""
-               |CREATE TABLE $sourceTable (
-               |    id INT,
-               |    name STRING,
-               |    price INT,
-               |    ts BIGINT
-               |) USING hudi
-               | tblproperties (
-               |  type = '$tableType'
-               | )
-               |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
-               |""".stripMargin)
-
-          spark.sql(
-            s"""
-               | INSERT INTO $sourceTable
-               | VALUES (1, 'John Doe', 19, 1),
-               |        (4, 'Alice Johnson', 49, 2)
-               |""".stripMargin)
-
           val targetTable = generateTableName
           spark.sql(
             s"""
@@ -1277,7 +1255,19 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase 
{
           spark.sql(
             s"""
                |MERGE INTO $targetTable t
-               |USING $sourceTable s
+               |USING (
+               | SELECT
+               |   CAST(1 AS INT) as id,
+               |   CAST('John Doe' AS STRING) as name,
+               |   CAST(19 AS INT) as price,
+               |   CAST(1 AS BIGINT) as ts
+               | UNION ALL
+               | SELECT
+               |   CAST(4 AS INT),
+               |   CAST('Alice Johnson' AS STRING),
+               |   CAST(49 AS INT),
+               |   CAST(2 AS BIGINT)
+               |) s
                |ON t.price = s.price
                |WHEN MATCHED THEN UPDATE SET
                |    t.id = s.id,
@@ -1303,30 +1293,6 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase 
{
   test("Test MergeInto with CUSTOM merge mode using FirstValueAvroPayload") {
     withRecordType()(withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
-        val sourceTable = generateTableName
-        spark.sql(
-          s"""
-            |CREATE TABLE $sourceTable (
-            |    id INT,
-            |    name STRING,
-            |    price INT,
-            |    ts BIGINT
-            |) USING hudi
-            | tblproperties (
-            |  type = '$tableType'
-            | )
-            |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
-            |""".stripMargin)
-
-        // Insert source data with same ts=1598886001 for id=1
-        spark.sql(
-          s"""
-            | INSERT INTO $sourceTable
-            | VALUES (1, 'John Doe Updated', 19, 1598886001),
-            |        (2, 'Jane Doe Updated', 24, 1598972401),
-            |        (4, 'Alice Johnson', 49, 2)
-            |""".stripMargin)
-
         val targetTable = generateTableName
         spark.sql(
           s"""
@@ -1337,7 +1303,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
             |  ts BIGINT
             |) using hudi
             |TBLPROPERTIES (
-            |  type = 'cow',
+            |  type = '$tableType',
             |  primaryKey = 'id',
             |  preCombineField = 'ts',
             |  recordMergeMode = '${RecordMergeMode.CUSTOM.name()}',
@@ -1347,44 +1313,60 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
             |LOCATION '${tmp.getCanonicalPath}/$targetTable'
             |""".stripMargin)
 
-        spark.sql(
-          s"""
-            |INSERT INTO $targetTable
-            |SELECT id, name, price, ts
-            |FROM (
-            |    SELECT 1 as id, 'John Doe Initial' as name, 19 as price, 
1598886001 as ts
-            |     UNION ALL
-            |     SELECT 2, 'Jane Doe', 24, 1598972400
-            |     UNION ALL
-            |     SELECT 3, 'Bob Smith', 14, 1599058800
-            |)
-            |""".stripMargin)
+          spark.sql(
+            s"""
+              |INSERT INTO $targetTable
+              |SELECT id, name, price, ts
+              |FROM (
+              |    SELECT 1 as id, 'John Doe Initial' as name, 19 as price, 
1598886001 as ts
+              |     UNION ALL
+              |     SELECT 2, 'Jane Doe', 24, 1598972400
+              |     UNION ALL
+              |     SELECT 3, 'Bob Smith', 14, 1599058800
+              |)
+              |""".stripMargin)
 
-        spark.sql(
-          s"""
-            |MERGE INTO $targetTable t
-            |USING $sourceTable s
-            |ON t.price = s.price
-            |WHEN MATCHED THEN UPDATE SET
-            |    t.id = s.id,
-            |    t.name = s.name,
-            |    t.price = s.price,
-            |    t.ts = s.ts
-            |WHEN NOT MATCHED THEN INSERT
-            |    (id, name, price, ts)
-            |VALUES
-            |    (s.id, s.name, s.price, s.ts)
-            |""".stripMargin)
+          spark.sql(
+            s"""
+              |MERGE INTO $targetTable t
+              |USING (
+              | SELECT
+              |   CAST(1 AS INT) as id,
+              |   CAST('John Doe Updated' AS STRING) as name,
+              |   CAST(19 AS INT) as price,
+              |   CAST(1598886001 AS BIGINT) as ts
+              | UNION ALL
+              | SELECT
+              |   CAST(2 AS INT),
+              |   CAST('Jane Doe Updated' AS STRING),
+              |   CAST(24 AS INT),
+              |   CAST(1598972401 AS BIGINT)
+              | UNION ALL
+              | SELECT
+              |   CAST(4 AS INT),
+              |   CAST('Alice Johnson' AS STRING),
+              |   CAST(49 AS INT),
+              |   CAST(2 AS BIGINT)
+              |) s
+              |ON t.price = s.price
+              |WHEN MATCHED THEN UPDATE SET
+              |    t.id = s.id,
+              |    t.name = s.name,
+              |    t.price = s.price,
+              |    t.ts = s.ts
+              |WHEN NOT MATCHED THEN INSERT
+              |    (id, name, price, ts)
+              |VALUES
+              |    (s.id, s.name, s.price, s.ts)
+              |""".stripMargin)
 
-        // Verify FirstValueAvroPayload behavior:
-        // - For id=1: keeps first value ("John Doe Initial") since timestamps 
are equal
-        // - For id=4: inserts new record normally
-        checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY 
id")(
-          Seq(1, "John Doe Initial", 19, 1598886001L), // 
FirstValueAvroPayload keeps first record
-          Seq(2, "Jane Doe Updated", 24, 1598972401L),
-          Seq(3, "Bob Smith", 14, 1599058800L),
-          Seq(4, "Alice Johnson", 49, 2L))
+          checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY 
id")(
+            Seq(1, "John Doe Initial", 19, 1598886001L),
+            Seq(2, "Jane Doe Updated", 24, 1598972401L),
+            Seq(3, "Bob Smith", 14, 1599058800L),
+            Seq(4, "Alice Johnson", 49, 2L))
+        }
       }
-    })
+    )
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
index 7282eddfb25..233e94b0999 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -182,7 +182,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}'
            | tblproperties (
@@ -257,7 +257,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts int
              |) using hudi
              | location '${tmp.getCanonicalPath}'
              | $prekstr
@@ -307,7 +307,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts int
            |) using hudi
            | location '${tmp.getCanonicalPath}'
            | tblproperties (
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
index 5161e622a7c..0b705fd7cac 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
@@ -212,6 +212,9 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
 
             // Delete record
             spark.sql(s"delete from $tableName where id = 1")
+            if (tableType == "mor") {
+              
HoodieSparkSqlTestBase.validateDeleteLogBlockPrecombineNullOrZero(tmp.getCanonicalPath)
+            }
             validateTableConfig(
               storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
             // Verify deletion
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
index 69a5c83d6ae..baf6c976100 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
@@ -226,6 +226,9 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
           )
           // Delete record with no ts.
           spark.sql(s"delete from $tableName where id = 1")
+          if (tableType == "mor") {
+            
HoodieSparkSqlTestBase.validateDeleteLogBlockPrecombineNullOrZero(tmp.getCanonicalPath)
+          }
           // Verify deletion
           validateTableConfig(
             storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
@@ -325,7 +328,7 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
               s"""
                  | merge into $tableName t
                  | using (
-                 |   select 7 as id, 'G' as name, 70.0 as price, 99 as ts 
union all
+                 |   select 7 as id, 'G' as name, 70.0 as price, 99L as ts 
union all
                  |   select 8, 'H', 80.0, 99 as ts
                  | ) s
                  | on t.id = s.id
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index f321d09039d..c12f78f27d3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -35,6 +35,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata
 
 import org.apache.avro.Schema
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getMetaClientAndFileSystemView
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 
@@ -94,7 +95,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
            | id int,
            | name string,
            | price double,
-           | _ts long,
+           | _ts int,
            | description string
            |) using hudi
            |tblproperties(
@@ -144,7 +145,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
            | id int,
            | name string,
            | price double,
-           | _ts long,
+           | _ts int,
            | description string
            |) using hudi
            |tblproperties(
@@ -205,7 +206,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
            | id int,
            | name string,
            | price double,
-           | _ts long,
+           | _ts int,
            | description string
            |) using hudi
            |tblproperties(
@@ -219,7 +220,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
         StructField("id", IntegerType, nullable = true),
         StructField("name", StringType, nullable = true),
         StructField("price", DoubleType, nullable = true),
-        StructField("_ts", LongType, nullable = true),
+        StructField("_ts", IntegerType, nullable = true),
         StructField("description", StringType, nullable = true))
 
       spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: 
desc1')," +
@@ -274,7 +275,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
            | id int,
            | name string,
            | price double,
-           | _ts long,
+           | _ts int,
            | description string
            |) using hudi
            |tblproperties(
@@ -288,7 +289,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
         StructField("id", IntegerType, nullable = true),
         StructField("name", StringType, nullable = true),
         StructField("price", DoubleType, nullable = true),
-        StructField("_ts", LongType, nullable = true),
+        StructField("_ts", IntegerType, nullable = true),
         StructField("description", StringType, nullable = true))
       spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: 
desc1')," +
         "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
@@ -444,7 +445,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
            | id int,
            | name string,
            | price double,
-           | _ts long,
+           | _ts int,
            | description string
            |) using hudi
            |tblproperties(
@@ -528,20 +529,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
                        expectedNumLogFile: Int,
                        changedFields: Seq[Seq[String]],
                        isPartial: Boolean): Unit = {
-    val storageConf = HoodieTestUtils.getDefaultStorageConf
-    val metaClient: HoodieTableMetaClient =
-      
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
-    val metadataConfig = HoodieMetadataConfig.newBuilder.build
-    val engineContext = new HoodieLocalEngineContext(storageConf)
-    val viewManager: FileSystemViewManager = 
FileSystemViewManager.createViewManager(
-      engineContext, metadataConfig, 
FileSystemViewStorageConfig.newBuilder.build,
-      HoodieCommonConfig.newBuilder.build,
-      (v1: HoodieTableMetaClient) => {
-        HoodieTableMetadata.create(
-          engineContext, metaClient.getStorage, metadataConfig, 
metaClient.getBasePath.toString)
-      }
-    )
-    val fsView: SyncableFileSystemView = 
viewManager.getFileSystemView(metaClient)
+    val (metaClient, fsView) = getMetaClientAndFileSystemView(basePath)
     val fileSlice: Optional[FileSlice] = fsView.getAllFileSlices("")
       .filter((fileSlice: FileSlice) => {
         HoodieTestUtils.getLogFileListFromFileSlice(fileSlice).size() == 
expectedNumLogFile

Reply via email to