This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ca0999420c [HUDI-6464] Spark SQL Merge Into for pkless tables (#9083)
0ca0999420c is described below

commit 0ca0999420ca36b777b23fc9cf3998ce8f31481f
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jul 6 23:16:51 2023 -0400

    [HUDI-6464] Spark SQL Merge Into for pkless tables (#9083)
    
    Tables with a primary key now must join on all primary key columns. 
Additionally, they can join on the partition path columns as well, which is 
recommended if the table is not using a global index.
    
    Tables without a primary key can join on any columns in the table. If 
multiple source table columns match a single target table column, precombine 
field will be used if set; otherwise, behavior is nondeterminate. To improve 
performance, the hudi meta cols are retained after the join, so that index 
lookup and keygeneration can be skipped.
---
 .../apache/hudi/config/HoodieInternalConfig.java   |   8 +
 .../hudi/index/HoodieInternalProxyIndex.java       |  71 +++++
 .../apache/hudi/index/SparkHoodieIndexFactory.java |   6 +
 .../factory/HoodieSparkKeyGeneratorFactory.java    |   6 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  13 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |   5 +-
 .../org/apache/hudi/HoodieCreateRecordUtils.scala  |  72 +++--
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  24 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   4 +-
 .../sql/hudi/command/MergeIntoKeyGenerator.scala   |  94 ++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  43 ++-
 .../hudi/command/MergeIntoHoodieTableCommand.scala | 168 ++++++----
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala |  12 +-
 .../spark/sql/hudi/TestMergeIntoTable2.scala       |  13 +-
 .../spark/sql/hudi/TestMergeIntoTable3.scala       | 353 +++++++++++++++++++++
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   7 +-
 .../catalyst/analysis/HoodieSpark2Analysis.scala   |   3 +-
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |   6 +
 .../catalyst/analysis/HoodieSpark30Analysis.scala} |  29 +-
 .../catalyst/analysis/HoodieSpark31Analysis.scala} |  29 +-
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |   8 +-
 .../hudi/analysis/HoodieSpark32PlusAnalysis.scala  | 133 +++++++-
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |   8 +-
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |  12 +-
 24 files changed, 966 insertions(+), 161 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
index 797df196441..a1c575c4f41 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
@@ -46,6 +46,14 @@ public class HoodieInternalConfig extends HoodieConfig {
       .withDocumentation("For SQL operations, if enables bulk_insert 
operation, "
           + "this configure will take effect to decide overwrite whole table 
or partitions specified");
 
+  public static final ConfigProperty<Boolean> SQL_MERGE_INTO_WRITES = 
ConfigProperty
+      .key("hoodie.internal.sql.merge.into.writes")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("0.14.0")
+      .withDocumentation("This internal config is used by Merge Into SQL logic 
only to mark such use case "
+          + "and let the core components know it should handle the write 
differently");
+
   /**
    * Returns if partition records are sorted or not.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieInternalProxyIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieInternalProxyIndex.java
new file mode 100644
index 00000000000..a0d53f6c2ed
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieInternalProxyIndex.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+public class HoodieInternalProxyIndex extends HoodieIndex<Object, Object> {
+
+  /**
+   * Index that does not do tagging. Its purpose is to be used for Spark sql 
Merge into command
+   * Merge into does not need to use index lookup because we get the location 
from the meta columns
+   * from the join. The reason why we can't use the prepped path is because we 
sometimes need to do
+   * a mix of updates and inserts and prepped only handles existing records
+   */
+  public HoodieInternalProxyIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    return records;
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return false;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+    return false;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index 53218709259..c908f40e4f0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.index;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieInternalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
@@ -43,6 +44,11 @@ import java.io.IOException;
  */
 public final class SparkHoodieIndexFactory {
   public static HoodieIndex createIndex(HoodieWriteConfig config) {
+    boolean mergeIntoWrites = 
config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
+        HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue());
+    if (mergeIntoWrites) {
+      return new HoodieInternalProxyIndex(config);
+    }
     // first use index class config to create index.
     if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
       return HoodieIndexUtils.createUserDefinedIndex(config);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index ae961b4dcf5..235bd63b99f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 
+import static 
org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES;
 import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
 import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;
 
@@ -76,7 +77,10 @@ public class HoodieSparkKeyGeneratorFactory {
 
   public static KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
     String keyGeneratorClass = getKeyGeneratorClassName(props);
-    boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props);
+    boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props)
+        //Need to prevent overwriting the keygen for spark sql merge into 
because we need to extract
+        //the recordkey from the meta cols if it exists. Sql keygen will use 
pkless keygen if needed.
+        && !props.getBoolean(SQL_MERGE_INTO_WRITES.key(), 
SQL_MERGE_INTO_WRITES.defaultValue());
     try {
       KeyGenerator keyGenerator = (KeyGenerator) 
ReflectionUtils.loadClass(keyGeneratorClass, props);
       if (autoRecordKeyGen) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 073b5916ef3..1a72228f99c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -26,10 +26,11 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSchemaConverters, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Command, Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources._
@@ -202,4 +203,12 @@ trait SparkAdapter extends Serializable {
    * Converts instance of [[StorageLevel]] to a corresponding string
    */
   def convertStorageLevelToString(level: StorageLevel): String
+
+  /**
+   * Calls fail analysis on
+   *
+   */
+  def failAnalysisForMIT(a: Attribute, cols: String): Unit = {}
+
+  def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, 
condition: Option[Expression], hint: String): LogicalPlan
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 2ceaa1606fd..1b961ba411a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -29,6 +29,8 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.config.HoodieInternalConfig
+import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
 import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.util.PathUtils
@@ -144,7 +146,8 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               rawDf: DataFrame): BaseRelation = {
-    val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false")
+    val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
+      optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), 
SQL_MERGE_INTO_WRITES.defaultValue().toString))
       .equalsIgnoreCase("true")) {
       rawDf // Don't remove meta columns for prepped write.
     } else {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index f55027ee1c2..bd90563abfd 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieKey, HoodieRecord, 
HoodieRecordLocation, HoodieSparkRecord, WriteOperationType}
 import 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -46,16 +47,31 @@ import scala.collection.JavaConversions.mapAsJavaMap
 object HoodieCreateRecordUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def createHoodieRecordRdd(df: DataFrame,
-                            config: HoodieWriteConfig,
-                            parameters: Map[String, String],
-                            recordName: String,
-                            recordNameSpace: String,
-                            writerSchema: Schema,
-                            dataFileSchema: Schema,
-                            operation: WriteOperationType,
-                            instantTime: String,
-                            isPrepped: Boolean) = {
+  case class createHoodieRecordRddArgs(df: DataFrame,
+                                       config: HoodieWriteConfig,
+                                       parameters: Map[String, String],
+                                       recordName: String,
+                                       recordNameSpace: String,
+                                       writerSchema: Schema,
+                                       dataFileSchema: Schema,
+                                       operation: WriteOperationType,
+                                       instantTime: String,
+                                       isPrepped: Boolean,
+                                       mergeIntoWrites: Boolean)
+
+  def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = {
+    val df = args.df
+    val config = args.config
+    val parameters = args.parameters
+    val recordName = args.recordName
+    val recordNameSpace = args.recordNameSpace
+    val writerSchema = args.writerSchema
+    val dataFileSchema = args.dataFileSchema
+    val operation = args.operation
+    val instantTime = args.instantTime
+    val isPrepped = args.isPrepped
+    val mergeIntoWrites = args.mergeIntoWrites
+
     val shouldDropPartitionColumns = 
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
     val recordType = config.getRecordMerger.getRecordType
     val autoGenerateRecordKeys: Boolean = 
!parameters.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
@@ -112,8 +128,8 @@ object HoodieCreateRecordUtils {
             }
 
             val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
 avroRec,
-              isPrepped)
-            val avroRecWithoutMeta: GenericRecord = if (isPrepped) {
+              isPrepped, mergeIntoWrites)
+            val avroRecWithoutMeta: GenericRecord = if (isPrepped || 
mergeIntoWrites) {
               HoodieAvroUtils.rewriteRecord(avroRec, 
HoodieAvroUtils.removeMetadataFields(dataFileSchema))
             } else {
               avroRec
@@ -170,7 +186,7 @@ object HoodieCreateRecordUtils {
               // Do validation only once.
               validatePreppedRecord = false
             }
-            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= 
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
 sourceRow, sourceStructType, isPrepped)
+            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= 
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
 sourceRow, sourceStructType, isPrepped, mergeIntoWrites)
 
             val targetRow = finalStructTypeRowWriter(sourceRow)
             var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, 
dataFileStructType, false)
@@ -206,27 +222,29 @@ object HoodieCreateRecordUtils {
   }
 
   def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: 
Option[BaseKeyGenerator], avroRec: GenericRecord,
-                                                 isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+                                                 isPrepped: Boolean, 
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+    //use keygen for mergeIntoWrites recordKey and partitionPath because the 
keygenerator handles
+    //fetching from the meta fields if they are populated and otherwise doing 
keygen
     val recordKey = if (isPrepped) {
       avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
     } else {
       keyGenerator.get.getRecordKey(avroRec)
-    };
+    }
 
     val partitionPath = if (isPrepped) {
       avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
     } else {
       keyGenerator.get.getPartitionPath(avroRec)
-    };
+    }
 
     val hoodieKey = new HoodieKey(recordKey, partitionPath)
-    val instantTime: Option[String] = if (isPrepped) {
+    val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
       
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString)
     }
     else {
       None
     }
-    val fileName: Option[String] = if (isPrepped) {
+    val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
       Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString)
     }
     else {
@@ -243,27 +261,29 @@ object HoodieCreateRecordUtils {
 
   def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
Option[SparkKeyGeneratorInterface],
                                                   sourceRow: InternalRow, 
schema: StructType,
-                                                  isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+                                                  isPrepped: Boolean, 
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+    //use keygen for mergeIntoWrites recordKey and partitionPath because the 
keygenerator handles
+    //fetching from the meta fields if they are populated and otherwise doing 
keygen
     val recordKey = if (isPrepped) {
-      
sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)
     } else {
       sparkKeyGenerator.get.getRecordKey(sourceRow, schema).toString
     }
 
     val partitionPath = if (isPrepped) {
-      
sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
+      sourceRow.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
     } else {
       sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString
-    };
+    }
 
-    val instantTime: Option[String] = if (isPrepped) {
-      
Option(sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)))
+    val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
+      Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD))
     } else {
       None
     }
 
-    val fileName: Option[String] = if (isPrepped) {
-      
Option(sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)))
+    val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
+      Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD))
     } else {
       None
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 2c0e95eabf2..39a8d94c9b5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -45,6 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient, T
 import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
+import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.internal.schema.InternalSchema
@@ -88,14 +89,6 @@ object HoodieSparkSqlWriter {
     ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
       .defaultValue(true)
 
-  /**
-   * For merge into from spark-sql, we need some special handling. for eg, 
schema validation should be disabled
-   * for writes from merge into. This config is used for internal purposes.
-   */
-  val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
-    ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
-      .defaultValue(false)
-
   /**
    * For spark streaming use-cases, holds the batch Id.
    */
@@ -337,10 +330,12 @@ object HoodieSparkSqlWriter {
 
             // Remove meta columns from writerSchema if isPrepped is true.
             val isPrepped = 
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
-            val processedDataSchema = if (isPrepped) {
-              HoodieAvroUtils.removeMetadataFields(writerSchema);
+            val mergeIntoWrites = 
parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
+               SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
+            val processedDataSchema = if (isPrepped || mergeIntoWrites) {
+              HoodieAvroUtils.removeMetadataFields(dataFileSchema)
             } else {
-              dataFileSchema;
+              dataFileSchema
             }
 
             // Create a HoodieWriteClient & issue the write.
@@ -372,8 +367,9 @@ object HoodieSparkSqlWriter {
             }
             // Convert to RDD[HoodieRecord]
             val hoodieRecords =
-              HoodieCreateRecordUtils.createHoodieRecordRdd(df, writeConfig, 
parameters, avroRecordName, avroRecordNamespace, writerSchema,
-                processedDataSchema, operation, instantTime, isPrepped)
+              
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
+                writeConfig, parameters, avroRecordName, avroRecordNamespace, 
writerSchema,
+                processedDataSchema, operation, instantTime, isPrepped, 
mergeIntoWrites))
 
             val dedupedHoodieRecords =
               if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
@@ -438,7 +434,7 @@ object HoodieSparkSqlWriter {
         // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
         val shouldCanonicalizeNullable = 
opts.getOrDefault(CANONICALIZE_NULLABLE.key,
           CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
-        val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
+        val mergeIntoWrites = 
opts.getOrDefault(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
           SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
 
         val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 09a6d873e8b..405e761635a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.SparkKeyGenUtils
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.hudi.command.SqlKeyGenerator
+import org.apache.spark.sql.hudi.command.{MergeIntoKeyGenerator, 
SqlKeyGenerator}
 
 import java.util.Properties
 import scala.collection.JavaConversions.mapAsJavaMap
@@ -115,7 +115,7 @@ object HoodieWriterUtils {
 
   def getOriginKeyGenerator(parameters: Map[String, String]): String = {
     val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
-    if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
+    if (classOf[SqlKeyGenerator].getCanonicalName == kg || 
classOf[MergeIntoKeyGenerator].getCanonicalName == kg) {
       parameters.getOrElse(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null)
     } else {
       kg
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala
new file mode 100644
index 00000000000..4399b25727b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.config.TypedProperties
+import 
org.apache.hudi.common.model.HoodieRecord.{PARTITION_PATH_META_FIELD_ORD, 
RECORD_KEY_META_FIELD_ORD}
+import org.apache.hudi.common.util.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * NOTE TO USERS: YOU SHOULD NOT SET THIS AS YOUR KEYGENERATOR
+ *
+ * Keygenerator that is meant to be used internally for the spark sql merge 
into command
+ * It will attempt to get the partition path and recordkey from the 
metafields, but will
+ * fallback to the sql keygenerator if the meta field is not populated
+ *
+ */
+class MergeIntoKeyGenerator(props: TypedProperties) extends 
SqlKeyGenerator(props) {
+
+  override def getRecordKey(record: GenericRecord): String = {
+    val recordKey = record.get(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey.toString
+    } else {
+      super.getRecordKey(record)
+    }
+  }
+
+  override def getRecordKey(row: Row): String = {
+    val recordKey = row.getString(RECORD_KEY_META_FIELD_ORD)
+    if (!StringUtils.isNullOrEmpty(recordKey)) {
+      recordKey
+    } else {
+      super.getRecordKey(row)
+    }
+  }
+
+  override def getRecordKey(internalRow: InternalRow, schema: StructType): 
UTF8String = {
+    val recordKey = internalRow.getUTF8String(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey
+    } else {
+      super.getRecordKey(internalRow, schema)
+    }
+  }
+
+  override def getPartitionPath(record: GenericRecord): String = {
+    val partitionPath = record.get(PARTITION_PATH_META_FIELD_ORD)
+    if (partitionPath != null) {
+      partitionPath.toString
+    } else {
+      super.getPartitionPath(record)
+    }
+  }
+
+  override def getPartitionPath(row: Row): String = {
+    val partitionPath = row.getString(PARTITION_PATH_META_FIELD_ORD)
+    if (!StringUtils.isNullOrEmpty(partitionPath)) {
+      partitionPath
+    } else {
+      super.getPartitionPath(row)
+    }
+  }
+
+  override def getPartitionPath(internalRow: InternalRow, schema: StructType): 
UTF8String = {
+    val partitionPath = 
internalRow.getUTF8String(PARTITION_PATH_META_FIELD_ORD)
+    if (partitionPath != null) {
+      partitionPath
+    } else {
+      super.getPartitionPath(internalRow, schema)
+    }
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index ad302962019..ddc2372b9a3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -43,23 +43,39 @@ object HoodieAnalysis extends SparkAdapterSupport {
     val rules: ListBuffer[RuleBuilder] = ListBuffer()
 
     // NOTE: This rule adjusts [[LogicalRelation]]s resolving into Hudi tables 
such that
-    //       meta-fields are not affecting the resolution of the target 
columns to be updated by Spark.
+    //       meta-fields are not affecting the resolution of the target 
columns to be updated by Spark (Except in the
+    //       case of MergeInto. We leave the meta columns on the target table, 
and use other means to ensure resolution)
     //       For more details please check out the scala-doc of the rule
-    // TODO limit adapters to only Spark < 3.2
     val adaptIngestionTargetLogicalRelations: RuleBuilder = session => 
AdaptIngestionTargetLogicalRelations(session)
 
-    if (HoodieSparkUtils.isSpark2) {
-      val spark2ResolveReferencesClass = 
"org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
-      val spark2ResolveReferences: RuleBuilder =
-        session => ReflectionUtils.loadClass(spark2ResolveReferencesClass, 
session).asInstanceOf[Rule[LogicalPlan]]
+    if (!HoodieSparkUtils.gteqSpark3_2) {
+      //Add or correct resolution of MergeInto
+      // the way we load the class via reflection is diff across spark2 and 
spark3 and hence had to split it out.
+      if (HoodieSparkUtils.isSpark2) {
+        val resolveReferencesClass = 
"org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
+        val sparkResolveReferences: RuleBuilder =
+          session => ReflectionUtils.loadClass(resolveReferencesClass, 
session).asInstanceOf[Rule[LogicalPlan]]
+        // TODO elaborate on the ordering
+        rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
+      } else if (HoodieSparkUtils.isSpark3_0) {
+        val resolveReferencesClass = 
"org.apache.spark.sql.catalyst.analysis.HoodieSpark30Analysis$ResolveReferences"
+        val sparkResolveReferences: RuleBuilder = {
+          session => instantiateKlass(resolveReferencesClass, session)
+        }
+        // TODO elaborate on the ordering
+        rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
+      } else if (HoodieSparkUtils.isSpark3_1) {
+        val resolveReferencesClass = 
"org.apache.spark.sql.catalyst.analysis.HoodieSpark31Analysis$ResolveReferences"
+        val sparkResolveReferences: RuleBuilder =
+          session => instantiateKlass(resolveReferencesClass, session)
+        // TODO elaborate on the ordering
+        rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
+      } else {
+        throw new IllegalStateException("Impossible to be here")
+      }
 
-      // TODO elaborate on the ordering
-      rules += (adaptIngestionTargetLogicalRelations, spark2ResolveReferences)
     } else {
       rules += adaptIngestionTargetLogicalRelations
-    }
-
-    if (HoodieSparkUtils.gteqSpark3_2) {
       val dataSourceV2ToV1FallbackClass = 
"org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback"
       val dataSourceV2ToV1Fallback: RuleBuilder =
         session => instantiateKlass(dataSourceV2ToV1FallbackClass, session)
@@ -192,9 +208,8 @@ object HoodieAnalysis extends SparkAdapterSupport {
           //       the data, as such we have to make sure that we handle both 
of these cases
           case mit@MatchMergeIntoTable(targetTable, query, _) =>
             val updatedTargetTable = targetTable match {
-              // In the receiving side of the MIT, we can't project meta-field 
attributes out,
-              // and instead have to explicitly remove them
-              case ResolvesToHudiTable(_) => 
Some(stripMetaFieldsAttributes(targetTable))
+              //Do not remove the meta cols here anymore
+              case ResolvesToHudiTable(_) => Some(targetTable)
               case _ => None
             }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 2b0aabacebe..d610758fd8c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -20,21 +20,23 @@ package org.apache.spark.sql.hudi.command
 import org.apache.avro.Schema
 import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE, 
SQL_MERGE_INTO_WRITES}
+import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
+import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
+import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.JFunction.scalaFunction1Noop
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, HoodieSparkUtils, SparkAdapterSupport}
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, 
attributeEquals}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, EqualTo, Expression, Literal, 
NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.LeftOuter
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
@@ -125,18 +127,68 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * expression involving [[source]] column(s), we will have to add "phony" 
column matching the
    * primary-key one of the target table.
    */
-  private lazy val primaryKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+  private lazy val recordKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+    val primaryKeyFields = hoodieCatalogTable.tableConfig.getRecordKeyFields
     val conditions = splitConjunctivePredicates(mergeInto.mergeCondition)
-    if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
-      throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement " +
-        s"(provided ${mergeInto.mergeCondition.sql}")
+    if (primaryKeyFields.isPresent) {
+      //pkless tables can have more complex conditions
+      if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
+        throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement on primary key table" +
+          s"(provided ${mergeInto.mergeCondition.sql}")
+      }
     }
-
     val resolver = sparkSession.sessionState.analyzer.resolver
-    val primaryKeyField = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp
+    val partitionPathFields = hoodieCatalogTable.tableConfig.getPartitionFields
+    //ensure all primary key fields are part of the merge condition
+    //allow partition path to be part of the merge condition but not required
+    val targetAttr2ConditionExpressions = doCasting(conditions, 
primaryKeyFields.isPresent)
+    val expressionSet = scala.collection.mutable.Set[(Attribute, 
Expression)](targetAttr2ConditionExpressions:_*)
+    var partitionAndKeyFields: Seq[(String,String)] = Seq.empty
+    if (primaryKeyFields.isPresent) {
+     partitionAndKeyFields = partitionAndKeyFields ++ 
primaryKeyFields.get().map(pk => ("primaryKey", pk)).toSeq
+    }
+    if (partitionPathFields.isPresent) {
+      partitionAndKeyFields = partitionAndKeyFields ++ 
partitionPathFields.get().map(pp => ("partitionPath", pp)).toSeq
+    }
+    val resolvedCols = partitionAndKeyFields.map(rk => {
+      val resolving = expressionSet.collectFirst {
+        case (attr, expr) if resolver(attr.name, rk._2) =>
+          // NOTE: Here we validate that condition expression involving 
record-key column(s) is a simple
+          //       attribute-reference expression (possibly wrapped into a 
cast). This is necessary to disallow
+          //       statements like following
+          //
+          //         MERGE INTO ... AS t USING (
+          //            SELECT ... FROM ... AS s
+          //         )
+          //            ON t.id = s.id + 1
+          //            WHEN MATCHED THEN UPDATE *
+          //
+          //       Which (in the current design) could result in a primary key 
of the record being modified,
+          //       which is not allowed.
+          if (!resolvesToSourceAttribute(expr)) {
+            throw new AnalysisException("Only simple conditions of the form 
`t.id = s.id` are allowed on the " +
+              s"primary-key and partition path column. Found `${attr.sql} = 
${expr.sql}`")
+          }
+          expressionSet.remove((attr, expr))
+          (attr, expr)
+      }
+      if (resolving.isEmpty && rk._1.equals("primaryKey")) {
+        throw new AnalysisException(s"Hudi tables with primary key are 
required to match on all primary key colums. Column: '${rk._2}' not found")
+      }
+      resolving
+    }).filter(_.nonEmpty).map(_.get)
+
+    if (expressionSet.nonEmpty && primaryKeyFields.isPresent) {
+      //if pkless additional expressions are allowed
+      throw new AnalysisException(s"Only simple conditions of the form `t.id = 
s.id` using primary key or partition path columns are allowed on tables with 
primary key. " +
+        s"(illegal column(s) used: `${expressionSet.map(x => 
x._1.name).mkString("`,`")}`")
+    }
+    resolvedCols
+  }
 
-    val targetAttrs = mergeInto.targetTable.outputSet
 
+  private def doCasting(conditions: Seq[Expression], pkTable: Boolean): 
Seq[(Attribute, Expression)] = {
+    val targetAttrs = mergeInto.targetTable.outputSet
     val exprUtils = sparkAdapter.getCatalystExpressionUtils
     // Here we're unraveling superfluous casting of expressions on both sides 
of the matched-on condition,
     // in case both of them are casted to the same type (which might be result 
of either explicit casting
@@ -146,7 +198,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // as they are w/o the ability of transforming them w/ custom expressions 
(unlike in vanilla Spark flow).
     //
     // Check out HUDI-4861 for more details
-    val cleanedConditions = 
conditions.map(_.asInstanceOf[EqualTo]).map(stripCasting)
+    val cleanedConditions = conditions.map(stripCasting)
 
     // Expressions of the following forms are supported:
     //    `target.id = <expr>` (or `<expr> = target.id`)
@@ -156,7 +208,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // target table side (since we're gonna be matching against primary-key 
column as is) expression
     // on the opposite side of the comparison should be cast-able to the 
primary-key column's data-type
     // t/h "up-cast" (ie w/o any loss in precision)
-    val targetAttr2ConditionExpressions = cleanedConditions.map {
+    cleanedConditions.collect {
       case EqualTo(CoercedAttributeReference(attr), expr) if 
targetAttrs.exists(f => attributeEquals(f, attr)) =>
         if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
           // NOTE: It's critical we reference output attribute here and not 
the one from condition
@@ -177,32 +229,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
             + s"can't cast ${expr.sql} (of ${expr.dataType}) to 
${attr.dataType}")
         }
 
-      case expr =>
+      case expr if pkTable =>
         throw new AnalysisException(s"Invalid MERGE INTO matching condition: 
`${expr.sql}`: "
           + "expected condition should be 'target.id = <source-column-expr>', 
e.g. "
-          + "`t.id = s.id` or `t.id = cast(s.id, ...)`")
-    }
-
-    targetAttr2ConditionExpressions.collect {
-      case (attr, expr) if resolver(attr.name, primaryKeyField) =>
-        // NOTE: Here we validate that condition expression involving 
primary-key column(s) is a simple
-        //       attribute-reference expression (possibly wrapped into a 
cast). This is necessary to disallow
-        //       statements like following
-        //
-        //         MERGE INTO ... AS t USING (
-        //            SELECT ... FROM ... AS s
-        //         )
-        //            ON t.id = s.id + 1
-        //            WHEN MATCHED THEN UPDATE *
-        //
-        //       Which (in the current design) could result in a primary key 
of the record being modified,
-        //       which is not allowed.
-        if (!resolvesToSourceAttribute(expr)) {
-          throw new AnalysisException("Only simple conditions of the form 
`t.id = s.id` are allowed on the " +
-            s"primary-key column. Found `${attr.sql} = ${expr.sql}`")
-        }
-
-        (attr, expr)
+          + "`t.id = s.id` or `t.id = cast(s.id, ...)")
     }
   }
 
@@ -245,11 +275,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // TODO move to analysis phase
     validate
 
-    val sourceDF: DataFrame = sourceDataset
+    if (HoodieSparkUtils.isSpark2) {
+      //already enabled by default for spark 3+
+      sparkSession.conf.set("spark.sql.crossJoin.enabled","true")
+    }
+
+    val projectedJoinedDF: DataFrame = projectedJoinedDataset
     // Create the write parameters
     val props = buildMergeIntoConfig(hoodieCatalogTable)
     // Do the upsert
-    executeUpsert(sourceDF, props)
+    executeUpsert(projectedJoinedDF, props)
     // Refresh the table in the catalog
     sparkSession.catalog.refreshTable(hoodieCatalogTable.table.qualifiedName)
 
@@ -298,18 +333,26 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * <li>{@code ts = source.sts}</li>
    * </ul>
    */
-  def sourceDataset: DataFrame = {
+  def projectedJoinedDataset: DataFrame = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
-    val sourceTablePlan = mergeInto.sourceTable
-    val sourceTableOutput = sourceTablePlan.output
+    // We want to join the source and target tables.
+    // Then we want to project the output so that we have the meta columns 
from the target table
+    // followed by the data columns of the source table
+    val tableMetaCols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
+    val joinData = sparkAdapter.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
+    val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
+    val projectedJoinPlan = Project(tableMetaCols ++ incomingDataCols, 
joinData)
+    val projectedJoinOutput = projectedJoinPlan.output
 
-    val requiredAttributesMap = primaryKeyAttributeToConditionExpression ++ 
preCombineAttributeAssociatedExpression
+    val requiredAttributesMap = recordKeyAttributeToConditionExpression ++ 
preCombineAttributeAssociatedExpression
 
     val (existingAttributesMap, missingAttributesMap) = 
requiredAttributesMap.partition {
-      case (keyAttr, _) => sourceTableOutput.exists(attr => 
resolver(keyAttr.name, attr.name))
+      case (keyAttr, _) => projectedJoinOutput.exists(attr => 
resolver(keyAttr.name, attr.name))
     }
 
+    // This is to handle the situation where condition is something like 
"s0.s_id = t0.id" so In the source table
+    // we add an additional column that is an alias of "s0.s_id" named "id"
     // NOTE: Primary key attribute (required) as well as Pre-combine one 
(optional) defined
     //       in the [[targetTable]] schema has to be present in the incoming 
[[sourceTable]] dataset.
     //       In cases when [[sourceTable]] doesn't bear such attributes 
(which, for ex, could happen
@@ -317,7 +360,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     //       them according to aforementioned heuristic) to meet Hudi's 
requirements
     val additionalColumns: Seq[NamedExpression] =
       missingAttributesMap.flatMap {
-        case (keyAttr, sourceExpression) if !sourceTableOutput.exists(attr => 
resolver(attr.name, keyAttr.name)) =>
+        case (keyAttr, sourceExpression) if !projectedJoinOutput.exists(attr 
=> resolver(attr.name, keyAttr.name)) =>
           Seq(Alias(sourceExpression, keyAttr.name)())
 
         case _ => Seq()
@@ -327,7 +370,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // matches to that one of the target table. This is necessary b/c unlike 
Spark, Avro is case-sensitive
     // and therefore would fail downstream if case of corresponding columns 
don't match
     val existingAttributes = existingAttributesMap.map(_._1)
-    val adjustedSourceTableOutput = sourceTableOutput.map { attr =>
+    val adjustedSourceTableOutput = projectedJoinOutput.map { attr =>
       existingAttributes.find(keyAttr => resolver(keyAttr.name, attr.name)) 
match {
         // To align the casing we just rename the attribute to match that one 
of the
         // target table
@@ -336,7 +379,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       }
     }
 
-    val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns, 
sourceTablePlan)
+    val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns, 
projectedJoinPlan)
 
     Dataset.ofRows(sparkSession, amendedPlan)
   }
@@ -348,13 +391,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * expressions to the ExpressionPayload#getInsertValue.
    */
   private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, 
String]): Unit = {
-    val operation = if 
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
+    val operation = if 
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, "")) && 
updatingActions.isEmpty) {
       INSERT_OPERATION_OPT_VAL
     } else {
       UPSERT_OPERATION_OPT_VAL
     }
 
-    // Append the table schema to the parameters. In the case of merge into, 
the schema of sourceDF
+    // Append the table schema to the parameters. In the case of merge into, 
the schema of projectedJoinedDF
     // may be different from the target table, because the are transform 
logical in the update or
     // insert actions.
     var writeParams = parameters +
@@ -387,7 +430,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     //  - Schema of the expected "joined" output of the [[sourceTable]] and 
[[targetTable]]
     writeParams ++= Seq(
       PAYLOAD_RECORD_AVRO_SCHEMA ->
-        convertStructTypeToAvroSchema(sourceDF.schema, "record", "").toString,
+        
HoodieAvroUtils.removeMetadataFields(convertStructTypeToAvroSchema(sourceDF.schema,
 "record", "")).toString,
       PAYLOAD_EXPECTED_COMBINED_SCHEMA -> 
encodeAsBase64String(toStructType(joinedExpectedOutput))
     )
 
@@ -483,7 +526,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
                 }
               case _ =>
                 throw new AnalysisException(s"Assignment expressions have to 
assign every attribute of target table " +
-                  s"(provided: `${assignments.map(_.sql).mkString(",")}`")
+                  s"(provided: `${assignments.map(_.sql).mkString(",")}`)")
             }
         }
       }
@@ -530,7 +573,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // NOTE: We're relying on [[sourceDataset]] here instead of 
[[mergeInto.sourceTable]],
     //       as it could be amended to add missing primary-key and/or 
pre-combine columns.
     //       Please check [[sourceDataset]] scala-doc for more details
-    sourceDataset.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output
+    (projectedJoinedDataset.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
   }
 
   private def resolvesToSourceAttribute(expr: Expression): Boolean = {
@@ -576,13 +619,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
 
     val overridingOpts = Map(
       "path" -> path,
-      RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
+      RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp,
       PRECOMBINE_FIELD.key -> preCombineField,
       TBL_NAME.key -> hoodieCatalogTable.tableName,
       PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
       HIVE_STYLE_PARTITIONING.key -> 
tableConfig.getHiveStylePartitioningEnable,
       URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-      KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+      KEYGENERATOR_CLASS_NAME.key -> 
classOf[MergeIntoKeyGenerator].getCanonicalName,
       SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> 
tableConfig.getKeyGeneratorClassName,
       HoodieSyncConfig.META_SYNC_ENABLED.key -> 
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
       HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> 
hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
@@ -605,7 +648,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       RECONCILE_SCHEMA.key -> "false",
       CANONICALIZE_NULLABLE.key -> "false",
       SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
-      SQL_MERGE_INTO_WRITES.key -> "true"
+      HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key -> "true",
+      HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() -> 
(!StringUtils.isNullOrEmpty(preCombineField)).toString
     )
 
     combineOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf,
@@ -628,17 +672,19 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   private def checkInsertingActions(insertActions: Seq[InsertAction]): Unit = {
     insertActions.foreach(insert =>
       assert(insert.assignments.length <= targetTableSchema.length,
-        s"The number of insert assignments[${insert.assignments.length}] must 
less than or equal to the " +
+        s"The number of insert assignments[${insert.assignments.length}] must 
be less than or equal to the " +
           s"targetTable field size[${targetTableSchema.length}]"))
 
   }
 
   private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
-
-    //updateActions.foreach(update =>
-    //  assert(update.assignments.length == targetTableSchema.length,
-    //    s"The number of update assignments[${update.assignments.length}] 
must equal to the " +
-    //      s"targetTable field size[${targetTableSchema.length}]"))
+    if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
+      logWarning(s"Updates without precombine can have nondeterministic 
behavior")
+    }
+    updateActions.foreach(update =>
+      assert(update.assignments.length <= targetTableSchema.length,
+        s"The number of update assignments[${update.assignments.length}] must 
be less than or equalequal to the " +
+          s"targetTable field size[${targetTableSchema.length}]"))
 
     // For MOR table, the target table field cannot be the right-value in the 
update action.
     if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
@@ -666,7 +712,7 @@ object MergeIntoHoodieTableCommand {
     }
   }
 
-  def stripCasting(expr: EqualTo): EqualTo = expr match {
+  def stripCasting(expr: Expression): Expression = expr match {
     case EqualTo(MatchCast(leftExpr, leftTargetType, _, _), 
MatchCast(rightExpr, rightTargetType, _, _))
       if leftTargetType.sameType(rightTargetType) => EqualTo(leftExpr, 
rightExpr)
     case _ => expr
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index a96fe669b57..7ee3e838a2e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -352,11 +352,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
 
       // Delete with condition expression.
       val errorMessage = if (HoodieSparkUtils.gteqSpark3_2) {
-        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key column. Found `t0.id = (s0.s_id + 1)`"
+        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key and partition path column. Found `t0.id = (s0.s_id + 1)`"
       } else if (HoodieSparkUtils.gteqSpark3_1) {
-        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key column. Found `t0.`id` = (s0.`s_id` + 1)`"
+        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key and partition path column. Found `t0.`id` = (s0.`s_id` + 1)`"
       } else {
-        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key column. Found `t0.`id` = (s0.`s_id` + 1)`;"
+        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key and partition path column. Found `t0.`id` = (s0.`s_id` + 1)`;"
       }
 
       checkException(
@@ -628,11 +628,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       // 1) set source column name to be same as target column
       //
       val complexConditionsErrorMessage = if (HoodieSparkUtils.gteqSpark3_2) {
-        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key column. Found `t0.id = (s0.id + 1)`"
+        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key and partition path column. Found `t0.id = (s0.id + 1)`"
       } else if (HoodieSparkUtils.gteqSpark3_1) {
-        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key column. Found `t0.`id` = (s0.`id` + 1)`"
+        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key and partition path column. Found `t0.`id` = (s0.`id` + 1)`"
       } else {
-        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key column. Found `t0.`id` = (s0.`id` + 1)`;"
+        "Only simple conditions of the form `t.id = s.id` are allowed on the 
primary-key and partition path column. Found `t0.`id` = (s0.`id` + 1)`;"
       }
 
       checkException(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index 48ba4ea0ca0..84f161444b0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -373,6 +373,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
     })
   }
 
+  /* TODO [HUDI-6472]
   test("Test MergeInto When PrimaryKey And PreCombineField Of Source Table And 
Target Table Differ In Case Only") {
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
@@ -552,6 +553,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
       )
     })
   }
+*/
 
   test("Test only insert when source table contains history") {
     withRecordType()(withTempDir { tmp =>
@@ -634,7 +636,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            |  select 1 as id1, 1 as id2, 'a1' as name, 11 as price, 110 as ts, 
'2022-08-19' as dt union all
            |  select 1 as id1, 2 as id2, 'a2' as name, 10 as price, 100 as ts, 
'2022-08-18' as dt
            | ) as s0
-           | on t0.id1 = s0.id1
+           | on t0.id1 = s0.id1 and t0.id2 = s0.id2
            | when not matched then insert *
          """.stripMargin
       )
@@ -876,7 +878,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                | using (
                |  select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, 
'2021-03-21' as dt
                |  union all
-               |  select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, 
'2021-03-21' as dt
+               |  select 1 as id, 'a2' as name, 10.4 as price, 1004 as ts, 
'2021-03-21' as dt
                | ) as s0
                | on t0.id = s0.id
                | when matched then update set *
@@ -884,10 +886,9 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
          """.stripMargin
           )
           checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-            Seq(1, "a1", 10.1, 1000, "2021-03-21"),
-            Seq(1, "a2", 10.2, 1002, "2021-03-21"),
-            Seq(3, "a3", 10.3, 1003, "2021-03-21"),
-            Seq(1, "a2", 10.2, 1002, "2021-03-21")
+            Seq(1, "a2", 10.4, 1004, "2021-03-21"),
+            Seq(1, "a2", 10.4, 1004, "2021-03-21"),
+            Seq(3, "a3", 10.3, 1003, "2021-03-21")
           )
         }
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable3.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable3.scala
new file mode 100644
index 00000000000..075fb332327
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable3.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.{HoodieSparkUtils, ScalaAssertionSupport}
+
+class TestMergeIntoTable3 extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
+
+  test("Test Merge into extra cond") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      val tableName2 = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName2 (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName2'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |    (1, 'a1', 10, 100),
+           |    (2, 'a2', 20, 200),
+           |    (3, 'a3', 20, 100)
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           |insert into $tableName2 values
+           |    (1, 'u1', 10, 999),
+           |    (3, 'u3', 30, 9999),
+           |    (4, 'u4', 40, 99999)
+           |""".stripMargin)
+
+      spark.sql(
+        s"""
+           |merge into $tableName as oldData
+           |using $tableName2
+           |on oldData.id = $tableName2.id
+           |when matched and oldData.price = $tableName2.price then update set 
oldData.name = $tableName2.name
+           |
+           |""".stripMargin)
+
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "u1", 10.0, 100),
+        Seq(3, "a3", 20.0, 100),
+        Seq(2, "a2", 20.0, 200)
+      )
+
+      val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
+        "Only simple conditions of the form `t.id = s.id` using primary key or 
partition path " +
+          "columns are allowed on tables with primary key. (illegal column(s) 
used: `price`"
+      } else {
+        "Only simple conditions of the form `t.id = s.id` using primary key or 
partition path " +
+          "columns are allowed on tables with primary key. (illegal column(s) 
used: `price`;"
+      }
+
+      checkException(
+        s"""
+           |merge into $tableName as oldData
+           |using $tableName2
+           |on oldData.id = $tableName2.id and oldData.price = 
$tableName2.price
+           |when matched then update set oldData.name = $tableName2.name
+           |when not matched then insert *
+           |""".stripMargin)(errorMessage)
+
+      //test with multiple pks
+      val tableName3 = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName3 (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName3'
+           | tblproperties (
+           |  primaryKey ='id,name',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+        "Hudi tables with primary key are required to match on all primary key 
colums. Column: 'name' not found"
+      } else {
+        "Hudi tables with primary key are required to match on all primary key 
colums. Column: 'name' not found;"
+      }
+
+      checkException(
+        s"""
+           |merge into $tableName3 as oldData
+           |using $tableName2
+           |on oldData.id = $tableName2.id
+           |when matched then update set oldData.name = $tableName2.name
+           |when not matched then insert *
+           |""".stripMargin)(errorMessage2)
+    }
+  }
+
+  test("Test pkless complex merge cond") {
+    withRecordType()(withTempDir { tmp =>
+      spark.sql("set hoodie.payload.combined.schema.validate = true")
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |    (1, 'a1', 10, 100),
+           |    (2, 'a2', 20, 100),
+           |    (3, 'a3', 30, 100),
+           |    (4, 'a4', 40, 100),
+           |    (5, 'a5', 50, 100),
+           |    (6, 'a6', 60, 100)
+           |""".stripMargin)
+
+      // First merge with a extra input field 'flag' (insert a new record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 2 as id, 'a1' as name, 50 as price, 999 as ts
+           | ) s0
+           | on $tableName.price < s0.price and $tableName.id >= s0.id
+           | when matched then update set ts = s0.ts
+           | when not matched then insert *
+       """.stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 10.0, 100),
+        Seq(2, "a2", 20.0, 999),
+        Seq(3, "a3", 30.0, 999),
+        Seq(4, "a4", 40.0, 999),
+        Seq(5, "a5", 50.0, 100),
+        Seq(6, "a6", 60.0, 100)
+      )
+
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 50 as price, 999 as ts
+           | ) s0
+           | on $tableName.id < s0.id
+           | when matched then update set ts = s0.ts
+           | when not matched then insert *
+       """.stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 10.0, 100),
+        Seq(2, "a2", 20.0, 999),
+        Seq(3, "a3", 30.0, 999),
+        Seq(4, "a4", 40.0, 999),
+        Seq(5, "a5", 50.0, 100),
+        Seq(6, "a6", 60.0, 100),
+        Seq(1, "a1", 50.0, 999)
+      )
+
+    })
+  }
+
+  test("Test pkless multiple source match") {
+    for (withPrecombine <- Seq(true, false)) {
+      withRecordType()(withTempDir { tmp =>
+        spark.sql("set hoodie.payload.combined.schema.validate = true")
+        val tableName = generateTableName
+
+        val prekstr = if (withPrecombine) "tblproperties (preCombineField = 
'ts')" else ""
+        // Create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}'
+             | $prekstr
+         """.stripMargin)
+
+        spark.sql(
+          s"""
+             |insert into $tableName values
+             |    (1, 'a1', 10, 100)
+             |""".stripMargin)
+
+        // First merge with a extra input field 'flag' (insert a new record)
+        spark.sql(
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 1 as id, 'a1' as name, 20 as price, 200 as ts
+             |  union all
+             |  select 2 as id, 'a1' as name, 30 as price, 100 as ts
+             | ) s0
+             | on $tableName.name = s0.name
+             | when matched then update set price = s0.price
+             | when not matched then insert *
+         """.stripMargin)
+        if (withPrecombine) {
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 20.0, 100)
+          )
+        } else {
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 30.0, 100)
+          )
+        }
+      })
+    }
+
+  }
+
+  test("Test MergeInto Basic pkless") {
+    withRecordType()(withTempDir { tmp =>
+      spark.sql("set hoodie.payload.combined.schema.validate = true")
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      // First merge with a extra input field 'flag' (insert a new record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as 
flag
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched and flag = '1' then update set
+           | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+           | when not matched and flag = '1' then insert *
+       """.stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 10.0, 1000)
+      )
+
+      // Second merge (update the record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched then update set
+           | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+           | when not matched then insert *
+       """.stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 20.0, 1001)
+      )
+
+      // the third time merge (update & insert the record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select * from (
+           |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
+           |  union all
+           |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
+           |  )
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched then update set
+           | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+           | when not matched and s0.id % 2 = 0 then insert *
+       """.stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 30.0, 1002),
+        Seq(2, "a2", 12.0, 1001)
+      )
+
+      // the fourth merge (delete the record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched and s0.id != 1 then update set
+           |    id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+           | when matched and s0.id = 1 then delete
+           | when not matched then insert *
+       """.stripMargin)
+      val cnt = spark.sql(s"select * from $tableName where id = 1").count()
+      assertResult(0)(cnt)
+    })
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index c760bf3442b..d1c40728d54 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, 
Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24HoodieParquetFileFormat}
@@ -186,4 +187,8 @@ class Spark2Adapter extends SparkAdapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: 
JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
+    Join(left, right, joinType, condition)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
index 4401615e90b..1faceed10b4 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
CurrentDate, CurrentTim
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, 
Window}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 
 /**
  * NOTE: This code is borrowed from Spark 3.1.3
@@ -76,7 +77,7 @@ object HoodieSpark2Analysis {
                                    mergeInto: MergeIntoTable,
                                    resolveValuesWithSourceOnly: Boolean): 
Seq[Assignment] = {
       if (assignments.isEmpty) {
-        val expandedColumns = mergeInto.targetTable.output
+        val expandedColumns = 
HoodieSqlCommonUtils.removeMetaFields(mergeInto.targetTable.output)
         val expandedValues = mergeInto.sourceTable.output
         expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
       } else {
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index d5a35980526..2e635e4677a 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -28,6 +28,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
InterpretedPredicate, Predicate}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.SparkAdapter
@@ -92,4 +94,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with 
Logging {
   }
 
   override def convertStorageLevelToString(level: StorageLevel): String
+
+  override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: 
JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
+    Join(left, right, joinType, condition, JoinHint.NONE)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark30Analysis.scala
similarity index 88%
copy from 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
copy to 
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark30Analysis.scala
index 4401615e90b..96d95b4a903 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark30Analysis.scala
@@ -18,28 +18,35 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
ResolveLambdaVariables, UnresolvedAttribute, UnresolvedExtractValue, 
caseInsensitiveResolution, withPosition}
 import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, 
Window}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 
 /**
- * NOTE: This code is borrowed from Spark 3.1.3
- *       This code is borrowed, so that we can have some advanced Spark SQL 
functionality (like Merge Into, for ex)
- *       in Spark 2.x
+ * NOTE: Taken from HoodieSpark2Analysis and modified to resolve source and 
target tables if not already resolved
  *
  *       PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY 
NECESSARY
  */
-object HoodieSpark2Analysis {
+object HoodieSpark30Analysis {
 
   case class ResolveReferences(spark: SparkSession) extends Rule[LogicalPlan] {
 
     private val resolver = spark.sessionState.conf.resolver
 
     override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
-      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
-        if (!m.resolved || containsUnresolvedStarAssignments(m)) && 
targetTable.resolved && sourceTable.resolved =>
-
+      case mO @ MergeIntoTable(targetTableO, sourceTableO, _, _, _)
+        //// Hudi change: don't want to go to the spark mit resolution so we 
resolve the source and target if they haven't been
+        //
+        if !mO.resolved || containsUnresolvedStarAssignments(mO) =>
+        lazy val analyzer = spark.sessionState.analyzer
+        val targetTable = if (targetTableO.resolved) targetTableO else 
analyzer.execute(targetTableO)
+        val sourceTable = if (sourceTableO.resolved) sourceTableO else 
analyzer.execute(sourceTableO)
+        val m = mO.copy(targetTable = targetTable, sourceTable = sourceTable)
+        //
+        ////
         EliminateSubqueryAliases(targetTable) match {
           case _ =>
             val newMatchedActions = m.matchedActions.map {
@@ -76,7 +83,11 @@ object HoodieSpark2Analysis {
                                    mergeInto: MergeIntoTable,
                                    resolveValuesWithSourceOnly: Boolean): 
Seq[Assignment] = {
       if (assignments.isEmpty) {
-        val expandedColumns = mergeInto.targetTable.output
+        ////Hudi change: filter out meta fields
+        //
+        val expandedColumns = 
HoodieSqlCommonUtils.removeMetaFields(mergeInto.targetTable.output)
+        //
+        ////
         val expandedValues = mergeInto.sourceTable.output
         expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
       } else {
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark31Analysis.scala
similarity index 88%
copy from 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
copy to 
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark31Analysis.scala
index 4401615e90b..1dc57acbc68 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark31Analysis.scala
@@ -18,28 +18,35 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
ResolveLambdaVariables, UnresolvedAttribute, UnresolvedExtractValue, 
caseInsensitiveResolution, withPosition}
 import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, 
Window}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 
 /**
- * NOTE: This code is borrowed from Spark 3.1.3
- *       This code is borrowed, so that we can have some advanced Spark SQL 
functionality (like Merge Into, for ex)
- *       in Spark 2.x
+ * NOTE: Taken from HoodieSpark2Analysis and modified to resolve source and 
target tables if not already resolved
  *
  *       PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY 
NECESSARY
  */
-object HoodieSpark2Analysis {
+object HoodieSpark31Analysis {
 
   case class ResolveReferences(spark: SparkSession) extends Rule[LogicalPlan] {
 
     private val resolver = spark.sessionState.conf.resolver
 
     override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
-      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
-        if (!m.resolved || containsUnresolvedStarAssignments(m)) && 
targetTable.resolved && sourceTable.resolved =>
-
+      case mO @ MergeIntoTable(targetTableO, sourceTableO, _, _, _)
+        //// Hudi change: don't want to go to the spark mit resolution so we 
resolve the source and target if they haven't been
+        //
+        if !mO.resolved || containsUnresolvedStarAssignments(mO) =>
+        lazy val analyzer = spark.sessionState.analyzer
+        val targetTable = if (targetTableO.resolved) targetTableO else 
analyzer.execute(targetTableO)
+        val sourceTable = if (sourceTableO.resolved) sourceTableO else 
analyzer.execute(sourceTableO)
+        val m = mO.copy(targetTable = targetTable, sourceTable = sourceTable)
+        //
+        ////
         EliminateSubqueryAliases(targetTable) match {
           case _ =>
             val newMatchedActions = m.matchedActions.map {
@@ -76,7 +83,11 @@ object HoodieSpark2Analysis {
                                    mergeInto: MergeIntoTable,
                                    resolveValuesWithSourceOnly: Boolean): 
Seq[Assignment] = {
       if (assignments.isEmpty) {
-        val expandedColumns = mergeInto.targetTable.output
+        ////Hudi change: filter out meta fields
+        //
+        val expandedColumns = 
HoodieSqlCommonUtils.removeMetaFields(mergeInto.targetTable.output)
+        //
+        ////
         val expandedValues = mergeInto.sourceTable.output
         expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
       } else {
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index f07d0ccdc63..91e24979717 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -22,9 +22,9 @@ import org.apache.hudi.Spark32HoodieFileScanRDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, 
EliminateSubqueryAliases}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, 
LogicalPlan}
@@ -123,4 +123,8 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
+    a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns 
[$cols]")
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index fc4d5d18ff2..cd02ded580a 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -20,15 +20,18 @@ package org.apache.spark.sql.hudi.analysis
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.{DataSourceReadOptions, DefaultSource, 
SparkAdapterSupport}
 import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
-import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, 
EliminateSubqueryAliases, NamedRelation, UnresolvedAttribute, 
UnresolvedPartitionSpec}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, 
HoodieTableChanges, HoodieTableChangesByPath, HoodieTableChangesOptionsParser}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, 
HoodieTableChanges, HoodieTableChangesOptionsParser}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
 import org.apache.spark.sql.connector.catalog.{Table, V1Table}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import 
org.apache.spark.sql.hudi.analysis.HoodieSpark32PlusAnalysis.{HoodieV1OrV2Table,
 ResolvesToHudiTable}
 import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
@@ -128,9 +131,135 @@ case class HoodieSpark32PlusResolveReferences(spark: 
SparkSession) extends Rule[
           catalogTable.location.toString))
         LogicalRelation(relation, catalogTable)
       }
+    case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
+      //// Hudi change: don't want to go to the spark mit resolution so we 
resolve the source and target if they haven't been
+      //
+      if !mO.resolved =>
+      lazy val analyzer = spark.sessionState.analyzer
+      val targetTable = if (targetTableO.resolved) targetTableO else 
analyzer.execute(targetTableO)
+      val sourceTable = if (sourceTableO.resolved) sourceTableO else 
analyzer.execute(sourceTableO)
+      val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable, 
sourceTable = sourceTable)
+      //
+      ////
+      EliminateSubqueryAliases(targetTable) match {
+        case r: NamedRelation if r.skipSchemaResolution =>
+          // Do not resolve the expression if the target table accepts any 
schema.
+          // This allows data sources to customize their own resolution logic 
using
+          // custom resolution rules.
+          m
+
+        case _ =>
+          val newMatchedActions = m.matchedActions.map {
+            case DeleteAction(deleteCondition) =>
+              val resolvedDeleteCondition = deleteCondition.map(
+                resolveExpressionByPlanChildren(_, m))
+              DeleteAction(resolvedDeleteCondition)
+            case UpdateAction(updateCondition, assignments) =>
+              val resolvedUpdateCondition = updateCondition.map(
+                resolveExpressionByPlanChildren(_, m))
+              UpdateAction(
+                resolvedUpdateCondition,
+                // The update value can access columns from both target and 
source tables.
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= false))
+            case UpdateStarAction(updateCondition) =>
+              ////Hudi change: filter out meta fields
+              //
+              val assignments = targetTable.output.filter(a => 
!isMetaField(a.name)).map { attr =>
+                Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+              }
+              //
+              ////
+              UpdateAction(
+                updateCondition.map(resolveExpressionByPlanChildren(_, m)),
+                // For UPDATE *, the value must from source table.
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= true))
+            case o => o
+          }
+          val newNotMatchedActions = m.notMatchedActions.map {
+            case InsertAction(insertCondition, assignments) =>
+              // The insert action is used when not matched, so its condition 
and value can only
+              // access columns from the source table.
+              val resolvedInsertCondition = insertCondition.map(
+                resolveExpressionByPlanChildren(_, Project(Nil, 
m.sourceTable)))
+              InsertAction(
+                resolvedInsertCondition,
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= true))
+            case InsertStarAction(insertCondition) =>
+              // The insert action is used when not matched, so its condition 
and value can only
+              // access columns from the source table.
+              val resolvedInsertCondition = insertCondition.map(
+                resolveExpressionByPlanChildren(_, Project(Nil, 
m.sourceTable)))
+              ////Hudi change: filter out meta fields
+              //
+              val assignments = targetTable.output.filter(a => 
!isMetaField(a.name)).map { attr =>
+                Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+              }
+              //
+              ////
+              InsertAction(
+                resolvedInsertCondition,
+                resolveAssignments(assignments, m, resolveValuesWithSourceOnly 
= true))
+            case o => o
+          }
+          val resolvedMergeCondition = 
resolveExpressionByPlanChildren(m.mergeCondition, m)
+          m.copy(mergeCondition = resolvedMergeCondition,
+            matchedActions = newMatchedActions,
+            notMatchedActions = newNotMatchedActions)
+      }
+  }
+
+  def resolveAssignments(
+                          assignments: Seq[Assignment],
+                          mergeInto: MergeIntoTable,
+                          resolveValuesWithSourceOnly: Boolean): 
Seq[Assignment] = {
+    assignments.map { assign =>
+      val resolvedKey = assign.key match {
+        case c if !c.resolved =>
+          resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable))
+        case o => o
+      }
+      val resolvedValue = assign.value match {
+        // The update values may contain target and/or source references.
+        case c if !c.resolved =>
+          if (resolveValuesWithSourceOnly) {
+            resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable))
+          } else {
+            resolveMergeExprOrFail(c, mergeInto)
+          }
+        case o => o
+      }
+      Assignment(resolvedKey, resolvedValue)
+    }
+  }
+
+  private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan): 
Expression = {
+    try {
+      val resolved = resolveExpressionByPlanChildren(e, p)
+      resolved.references.filter(!_.resolved).foreach { a =>
+        // Note: This will throw error only on unresolved attribute issues,
+        // not other resolution errors like mismatched data types.
+        val cols = p.inputSet.toSeq.map(_.sql).mkString(", ")
+        //// change from spark because spark 3.4 constructor is different for 
fail analysis
+        //
+        sparkAdapter.failAnalysisForMIT(a, cols)
+        //
+        ////
+      }
+      resolved
+    } catch {
+      case x: AnalysisException => throw x
+    }
+  }
+
+  private[sql] object MatchMergeIntoTable {
+    def unapply(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, 
Expression)] =
+      sparkAdapter.getCatalystPlanUtils.unapplyMergeIntoTable(plan)
   }
+
 }
 
+
+
 /**
  * Rule replacing resolved Spark's commands (not working for Hudi tables 
out-of-the-box) with
  * corresponding Hudi implementations
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 651027f932d..e481c9302f9 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -22,9 +22,9 @@ import org.apache.hudi.Spark33HoodieFileScanRDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, 
EliminateSubqueryAliases}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -124,4 +124,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
+    a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns 
[$cols]")
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 3bd8256c3aa..cfeacabce42 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -21,9 +21,9 @@ import org.apache.avro.Schema
 import org.apache.hudi.Spark34HoodieFileScanRDD
 import org.apache.spark.sql.avro._
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, 
EliminateSubqueryAliases}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -124,4 +124,12 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
+    a.failAnalysis(
+      errorClass = "_LEGACY_ERROR_TEMP_2309",
+      messageParameters = Map(
+        "sqlExpr" -> a.sql,
+        "cols" -> cols))
+  }
 }

Reply via email to