This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f2fdf8a8052 [HUDI-6315] Feature flag for disabling prepped merge. 
(#9203)
f2fdf8a8052 is described below

commit f2fdf8a805285d9f79444c3247726da8e244fbfb
Author: Amrish Lal <[email protected]>
AuthorDate: Thu Jul 20 16:22:35 2023 -0700

    [HUDI-6315] Feature flag for disabling prepped merge. (#9203)
    
    - Add user-defined feature flag for disabling prepped merge 
("hoodie.spark.sql.optimized.writes.enable")
---
 .../apache/hudi/config/HoodieInternalConfig.java   |   8 -
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   6 +
 .../apache/hudi/index/SparkHoodieIndexFactory.java |   6 +-
 .../factory/HoodieSparkKeyGeneratorFactory.java    |   4 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  11 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  10 +-
 .../org/apache/hudi/HoodieCreateRecordUtils.scala  |  26 +--
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  23 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   1 -
 .../hudi/command/DeleteHoodieTableCommand.scala    |  12 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  23 +-
 .../hudi/command/UpdateHoodieTableCommand.scala    |  12 +-
 .../hudi-spark/src/test/java/HoodieJavaApp.java    |   2 +-
 .../apache/spark/sql/hudi/TestDeleteTable.scala    |  10 +-
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 236 +++++++++++----------
 .../TestMergeIntoTableWithNonRecordKeyField.scala  | 207 +++++++++---------
 .../apache/spark/sql/hudi/TestUpdateTable.scala    |  11 +-
 17 files changed, 322 insertions(+), 286 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
index a1c575c4f41..797df196441 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
@@ -46,14 +46,6 @@ public class HoodieInternalConfig extends HoodieConfig {
       .withDocumentation("For SQL operations, if enables bulk_insert 
operation, "
           + "this configure will take effect to decide overwrite whole table 
or partitions specified");
 
-  public static final ConfigProperty<Boolean> SQL_MERGE_INTO_WRITES = 
ConfigProperty
-      .key("hoodie.internal.sql.merge.into.writes")
-      .defaultValue(false)
-      .markAdvanced()
-      .sinceVersion("0.14.0")
-      .withDocumentation("This internal config is used by Merge Into SQL logic 
only to mark such use case "
-          + "and let the core components know it should handle the write 
differently");
-
   /**
    * Returns if partition records are sorted or not.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5f594e553af..1581e21c070 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -721,6 +721,12 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "The class must be a subclass of 
`org.apache.hudi.callback.HoodieClientInitCallback`."
           + "By default, no Hudi client init callback is executed.");
 
+  /**
+   * Config key with boolean value that indicates whether record being written 
during MERGE INTO Spark SQL
+   * operation are already prepped.
+   */
+  public static final String SPARK_SQL_MERGE_INTO_PREPPED_KEY = 
"_hoodie.spark.sql.merge.into.prepped";
+
   private ConsistencyGuardConfig consistencyGuardConfig;
   private FileSystemRetryConfig fileSystemRetryConfig;
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index c908f40e4f0..eebaf0f05ba 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -21,7 +21,6 @@ package org.apache.hudi.index;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.config.HoodieInternalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
@@ -44,9 +43,8 @@ import java.io.IOException;
  */
 public final class SparkHoodieIndexFactory {
   public static HoodieIndex createIndex(HoodieWriteConfig config) {
-    boolean mergeIntoWrites = 
config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
-        HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue());
-    if (mergeIntoWrites) {
+    boolean sqlMergeIntoPrepped = 
config.getProps().getBoolean(HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY,
 false);
+    if (sqlMergeIntoPrepped) {
       return new HoodieInternalProxyIndex(config);
     }
     // first use index class config to create index.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index 235bd63b99f..cceaad0a785 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -45,7 +45,7 @@ import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 
-import static 
org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES;
+import static 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY;
 import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
 import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;
 
@@ -80,7 +80,7 @@ public class HoodieSparkKeyGeneratorFactory {
     boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props)
         //Need to prevent overwriting the keygen for spark sql merge into 
because we need to extract
         //the recordkey from the meta cols if it exists. Sql keygen will use 
pkless keygen if needed.
-        && !props.getBoolean(SQL_MERGE_INTO_WRITES.key(), 
SQL_MERGE_INTO_WRITES.defaultValue());
+        && !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
     try {
       KeyGenerator keyGenerator = (KeyGenerator) 
ReflectionUtils.loadClass(keyGeneratorClass, props);
       if (autoRecordKeyGen) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 841f1088db6..6fb84932c13 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -311,9 +311,10 @@ object DataSourceWriteOptions {
     .withDocumentation("The table type for the underlying data, for this 
write. This can’t change between writes.")
 
   /**
-   * Config key with boolean value that indicates whether record being written 
is already prepped.
+   * Config key with boolean value that indicates whether record being written 
during UPDATE or DELETE Spark SQL
+   * operations are already prepped.
    */
-  val DATASOURCE_WRITE_PREPPED_KEY = "_hoodie.datasource.write.prepped";
+  val SPARK_SQL_WRITES_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped";
 
   /**
     * May be derive partition path from incoming df if not explicitly set.
@@ -641,12 +642,12 @@ object DataSourceWriteOptions {
 
   val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] = 
HoodieTableConfig.DROP_PARTITION_COLUMNS
 
-  val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty
-    .key("hoodie.spark.sql.writes.optimized.enable")
+  val SPARK_SQL_OPTIMIZED_WRITES: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.spark.sql.optimized.writes.enable")
     .defaultValue("true")
     .markAdvanced()
     .sinceVersion("0.14.0")
-    .withDocumentation("Controls whether spark sql optimized update is 
enabled.")
+    .withDocumentation("Controls whether spark sql prepped update, delete, and 
merge are enabled.")
 
   /** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods 
instead */
   @Deprecated
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 1b961ba411a..8fe36ec71be 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
DATASOURCE_WRITE_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
SPARK_SQL_WRITES_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
@@ -29,9 +29,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieInternalConfig
-import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
-import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
+import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, 
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.util.PathUtils
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
@@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               rawDf: DataFrame): BaseRelation = {
-    val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
-      optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), 
SQL_MERGE_INTO_WRITES.defaultValue().toString))
-      .equalsIgnoreCase("true")) {
+    val df = if (optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, 
"false").toBoolean || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
"false").toBoolean) {
       rawDf // Don't remove meta columns for prepped write.
     } else {
       rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index 09d6c7a106a..d59edc64bf8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -57,7 +57,7 @@ object HoodieCreateRecordUtils {
                                        operation: WriteOperationType,
                                        instantTime: String,
                                        isPrepped: Boolean,
-                                       mergeIntoWrites: Boolean)
+                                       sqlMergeIntoPrepped: Boolean)
 
   def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = {
     val df = args.df
@@ -70,7 +70,7 @@ object HoodieCreateRecordUtils {
     val operation = args.operation
     val instantTime = args.instantTime
     val isPrepped = args.isPrepped
-    val mergeIntoWrites = args.mergeIntoWrites
+    val sqlMergeIntoPrepped = args.sqlMergeIntoPrepped
 
     val shouldDropPartitionColumns = 
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
     val recordType = config.getRecordMerger.getRecordType
@@ -127,8 +127,8 @@ object HoodieCreateRecordUtils {
             }
 
             val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
 avroRec,
-              isPrepped, mergeIntoWrites)
-            val avroRecWithoutMeta: GenericRecord = if (isPrepped || 
mergeIntoWrites) {
+              isPrepped, sqlMergeIntoPrepped)
+            val avroRecWithoutMeta: GenericRecord = if (isPrepped || 
sqlMergeIntoPrepped) {
               HoodieAvroUtils.rewriteRecord(avroRec, 
HoodieAvroUtils.removeMetadataFields(dataFileSchema))
             } else {
               avroRec
@@ -184,7 +184,7 @@ object HoodieCreateRecordUtils {
               validateMetaFieldsInSparkRecords(sourceStructType)
               validatePreppedRecord = false
             }
-            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= 
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
 sourceRow, sourceStructType, isPrepped, mergeIntoWrites)
+            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= 
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
 sourceRow, sourceStructType, isPrepped, sqlMergeIntoPrepped)
 
             val targetRow = finalStructTypeRowWriter(sourceRow)
             var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, 
dataFileStructType, false)
@@ -220,8 +220,8 @@ object HoodieCreateRecordUtils {
   }
 
   def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: 
Option[BaseKeyGenerator], avroRec: GenericRecord,
-                                                 isPrepped: Boolean, 
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
-    //use keygen for mergeIntoWrites recordKey and partitionPath because the 
keygenerator handles
+                                                 isPrepped: Boolean, 
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+    //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because 
the keygenerator handles
     //fetching from the meta fields if they are populated and otherwise doing 
keygen
     val recordKey = if (isPrepped) {
       avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
@@ -236,13 +236,13 @@ object HoodieCreateRecordUtils {
     }
 
     val hoodieKey = new HoodieKey(recordKey, partitionPath)
-    val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
+    val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
       
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString)
     }
     else {
       None
     }
-    val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
+    val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
       Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString)
     }
     else {
@@ -259,8 +259,8 @@ object HoodieCreateRecordUtils {
 
   def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
Option[SparkKeyGeneratorInterface],
                                                   sourceRow: InternalRow, 
schema: StructType,
-                                                  isPrepped: Boolean, 
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
-    //use keygen for mergeIntoWrites recordKey and partitionPath because the 
keygenerator handles
+                                                  isPrepped: Boolean, 
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+    //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because 
the keygenerator handles
     //fetching from the meta fields if they are populated and otherwise doing 
keygen
     val recordKey = if (isPrepped) {
       sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)
@@ -274,13 +274,13 @@ object HoodieCreateRecordUtils {
       sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString
     }
 
-    val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
+    val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
       Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD))
     } else {
       None
     }
 
-    val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
+    val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
       Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD))
     } else {
       None
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 272b55b7f20..ca1359578d1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -45,7 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient, T
 import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
-import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
+import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.index.HoodieIndex
@@ -91,6 +91,14 @@ object HoodieSparkSqlWriter {
     ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
       .defaultValue(true)
 
+  /**
+   * For merge into from spark-sql, we need some special handling. for eg, 
schema validation should be disabled
+   * for writes from merge into. This config is used for internal purposes.
+   */
+  val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
+    ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
+      .defaultValue(false)
+
   /**
    * For spark streaming use-cases, holds the batch Id.
    */
@@ -250,7 +258,7 @@ object HoodieSparkSqlWriter {
           case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
             val genericRecords = HoodieSparkUtils.createRdd(df, 
avroRecordName, avroRecordNamespace)
             // Convert to RDD[HoodieKey]
-            val isPrepped = 
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
+            val isPrepped = 
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
             val keyGenerator: Option[BaseKeyGenerator] = if (isPrepped) {
               None
             } else {
@@ -348,10 +356,9 @@ object HoodieSparkSqlWriter {
             }
 
             // Remove meta columns from writerSchema if isPrepped is true.
-            val isPrepped = 
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
-            val mergeIntoWrites = 
parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
-               SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
-            val processedDataSchema = if (isPrepped || mergeIntoWrites) {
+            val isPrepped = 
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
+            val sqlMergeIntoPrepped = 
parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
+            val processedDataSchema = if (isPrepped || sqlMergeIntoPrepped) {
               HoodieAvroUtils.removeMetadataFields(dataFileSchema)
             } else {
               dataFileSchema
@@ -388,7 +395,7 @@ object HoodieSparkSqlWriter {
             val hoodieRecords =
               
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
                 writeConfig, parameters, avroRecordName, avroRecordNamespace, 
writerSchema,
-                processedDataSchema, operation, instantTime, isPrepped, 
mergeIntoWrites))
+                processedDataSchema, operation, instantTime, isPrepped, 
sqlMergeIntoPrepped))
 
             val dedupedHoodieRecords =
               if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
@@ -453,7 +460,7 @@ object HoodieSparkSqlWriter {
         // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
         val shouldCanonicalizeNullable = 
opts.getOrDefault(CANONICALIZE_NULLABLE.key,
           CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
-        val mergeIntoWrites = 
opts.getOrDefault(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
+        val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
           SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
 
         val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index cf8d85f704e..405e761635a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -82,7 +82,6 @@ object HoodieWriterUtils {
     hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
     hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
     
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
-    hoodieConfig.setDefaultValue(ENABLE_OPTIMIZED_SQL_WRITES)
     Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ 
DataSourceOptionsHelper.translateConfigurations(parameters)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index d10b3d529f5..55f2ebb8ac6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, 
ENABLE_OPTIMIZED_SQL_WRITES}
+import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_WRITES_PREPPED_KEY, 
SPARK_SQL_OPTIMIZED_WRITES}
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
@@ -40,8 +40,8 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) 
extends HoodieLeafRunn
 
     val condition = sparkAdapter.extractDeleteCondition(dft)
 
-    val targetLogicalPlan = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
-      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+    val targetLogicalPlan = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+      , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
       dft.table
     } else {
       stripMetaFieldAttributes(dft.table)
@@ -53,9 +53,9 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) 
extends HoodieLeafRunn
       targetLogicalPlan
     }
 
-    val config = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
-      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
-      buildHoodieDeleteTableConfig(catalogTable, sparkSession) + 
(DATASOURCE_WRITE_PREPPED_KEY -> "true")
+    val config = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+      , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
+      buildHoodieDeleteTableConfig(catalogTable, sparkSession) + 
(SPARK_SQL_WRITES_PREPPED_KEY -> "true")
     } else {
       buildHoodieDeleteTableConfig(catalogTable, sparkSession)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index d4c72e5bfb4..eba75c95452 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
-import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -342,7 +342,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     val tableMetaCols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
     val joinData = 
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
     val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
-    val projectedJoinPlan = Project(tableMetaCols ++ incomingDataCols, 
joinData)
+    val projectedJoinPlan = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
+      Project(tableMetaCols ++ incomingDataCols, joinData)
+    } else {
+      Project(incomingDataCols, joinData)
+    }
+
     val projectedJoinOutput = projectedJoinPlan.output
 
     val requiredAttributesMap = recordKeyAttributeToConditionExpression ++ 
preCombineAttributeAssociatedExpression
@@ -617,6 +622,15 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
 
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
 
+    val enableOptimizedMerge = 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
+      SPARK_SQL_OPTIMIZED_WRITES.defaultValue())
+
+    val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
+      classOf[MergeIntoKeyGenerator].getCanonicalName
+    } else {
+      classOf[SqlKeyGenerator].getCanonicalName
+    }
+
     val overridingOpts = Map(
       "path" -> path,
       RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp,
@@ -625,7 +639,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
       HIVE_STYLE_PARTITIONING.key -> 
tableConfig.getHiveStylePartitioningEnable,
       URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
-      KEYGENERATOR_CLASS_NAME.key -> 
classOf[MergeIntoKeyGenerator].getCanonicalName,
+      KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName,
       SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> 
tableConfig.getKeyGeneratorClassName,
       HoodieSyncConfig.META_SYNC_ENABLED.key -> 
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
       HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> 
hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
@@ -648,7 +662,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       RECONCILE_SCHEMA.key -> "false",
       CANONICALIZE_NULLABLE.key -> "false",
       SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
-      HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key -> "true",
+      HoodieSparkSqlWriter.SQL_MERGE_INTO_WRITES.key -> "true",
+      HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY -> 
enableOptimizedMerge,
       HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() -> 
(!StringUtils.isNullOrEmpty(preCombineField)).toString
     )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 7d6d5f39bb1..e35e4939f04 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, 
ENABLE_OPTIMIZED_SQL_WRITES}
+import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_WRITES_PREPPED_KEY, 
SPARK_SQL_OPTIMIZED_WRITES}
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
 import org.apache.spark.sql._
@@ -44,8 +44,8 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends 
HoodieLeafRunnableC
       case Assignment(attr: AttributeReference, value) => attr -> value
     }
 
-    val filteredOutput = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
-      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+    val filteredOutput = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+      , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
       ut.table.output
     } else {
       removeMetaFields(ut.table.output)
@@ -63,10 +63,10 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) 
extends HoodieLeafRunnableC
     val condition = ut.condition.getOrElse(TrueLiteral)
     val filteredPlan = Filter(condition, Project(targetExprs, ut.table))
 
-    val config = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
-      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+    val config = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+      , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
       // Set config to show that this is a prepped write.
-      buildHoodieConfig(catalogTable) + (DATASOURCE_WRITE_PREPPED_KEY -> 
"true")
+      buildHoodieConfig(catalogTable) + (SPARK_SQL_WRITES_PREPPED_KEY -> 
"true")
     } else {
       buildHoodieConfig(catalogTable)
     }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index 448215a9b0a..844f134ed83 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -161,7 +161,7 @@ public class HoodieJavaApp {
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false")
         .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
         .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
-        .option(DataSourceWriteOptions.ENABLE_OPTIMIZED_SQL_WRITES().key(), 
"true")
+        .option(DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().key(), 
DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().defaultValue())
         // This will remove any existing data at path below, and create a
         .mode(SaveMode.Overwrite);
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
index 6233884a63e..bc87405b9f9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
@@ -26,7 +26,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
 
   test("Test Delete Table") {
     withTempDir { tmp =>
-      Seq(true, false).foreach { optimizedSqlEnabled =>
+      Seq(true, false).foreach { sparkSqlOptimizedWrites =>
         Seq("cow", "mor").foreach { tableType =>
           val tableName = generateTableName
           // create table
@@ -47,7 +47,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
        """.stripMargin)
 
           // test with optimized sql writes enabled / disabled.
-          spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+          spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
 
           // insert data to table
           spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
@@ -97,7 +97,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
    """.stripMargin)
 
         // test with optimized sql writes enabled.
-        spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true")
+        spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
 
         // insert data to table
         spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
@@ -279,7 +279,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
 
   Seq(false, true).foreach { urlencode =>
     test(s"Test Delete single-partition table' partitions, urlencode: 
$urlencode") {
-      Seq(true, false).foreach { optimizedSqlEnabled =>
+      Seq(true, false).foreach { sparkSqlOptimizedWrites =>
         withTempDir { tmp =>
           val tableName = generateTableName
           val tablePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -308,7 +308,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
                |""".stripMargin)
 
           // test with optimized sql writes enabled / disabled.
-          spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+          spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
 
           // delete 2021-10-01 partition
           if (urlencode) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 7ee3e838a2e..63adacbf129 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils, ScalaAssertionSupport}
 import org.apache.spark.sql.internal.SQLConf
@@ -24,94 +25,99 @@ import org.apache.spark.sql.internal.SQLConf
 class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
 
   test("Test MergeInto Basic") {
-    withRecordType()(withTempDir { tmp =>
-      spark.sql("set hoodie.payload.combined.schema.validate = true")
-      val tableName = generateTableName
-      // Create table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | location '${tmp.getCanonicalPath}'
-           | tblproperties (
-           |  primaryKey ='id',
-           |  preCombineField = 'ts'
-           | )
+    Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+      withRecordType()(withTempDir { tmp =>
+        spark.sql("set hoodie.payload.combined.schema.validate = false")
+        val tableName = generateTableName
+        // Create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}'
+             | tblproperties (
+             |  primaryKey ='id',
+             |  preCombineField = 'ts'
+             | )
        """.stripMargin)
 
-      // First merge with a extra input field 'flag' (insert a new record)
-      spark.sql(
-        s"""
-           | merge into $tableName
-           | using (
-           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as 
flag
-           | ) s0
-           | on s0.id = $tableName.id
-           | when matched and flag = '1' then update set
-           | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
-           | when not matched and flag = '1' then insert *
+        // test with optimized sql merge enabled / disabled.
+        spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+
+        // First merge with a extra input field 'flag' (insert a new record)
+        spark.sql(
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as 
flag
+             | ) s0
+             | on s0.id = $tableName.id
+             | when matched and flag = '1' then update set
+             | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+             | when not matched and flag = '1' then insert *
        """.stripMargin)
-      checkAnswer(s"select id, name, price, ts from $tableName")(
-        Seq(1, "a1", 10.0, 1000)
-      )
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 10.0, 1000)
+        )
 
-      // Second merge (update the record)
-      spark.sql(
-        s"""
-           | merge into $tableName
-           | using (
-           |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
-           | ) s0
-           | on s0.id = $tableName.id
-           | when matched then update set
-           | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
-           | when not matched then insert *
+        // Second merge (update the record)
+        spark.sql(
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
+             | ) s0
+             | on s0.id = $tableName.id
+             | when matched then update set
+             | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
+             | when not matched then insert *
        """.stripMargin)
-      checkAnswer(s"select id, name, price, ts from $tableName")(
-        Seq(1, "a1", 20.0, 1001)
-      )
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 20.0, 1001)
+        )
 
-      // the third time merge (update & insert the record)
-      spark.sql(
-        s"""
-           | merge into $tableName
-           | using (
-           |  select * from (
-           |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
-           |  union all
-           |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
-           |  )
-           | ) s0
-           | on s0.id = $tableName.id
-           | when matched then update set
-           | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
-           | when not matched and s0.id % 2 = 0 then insert *
+        // the third time merge (update & insert the record)
+        spark.sql(
+          s"""
+             | merge into $tableName
+             | using (
+             |  select * from (
+             |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
+             |  union all
+             |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
+             |  )
+             | ) s0
+             | on s0.id = $tableName.id
+             | when matched then update set
+             | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
+             | when not matched and s0.id % 2 = 0 then insert *
        """.stripMargin)
-      checkAnswer(s"select id, name, price, ts from $tableName")(
-        Seq(1, "a1", 30.0, 1002),
-        Seq(2, "a2", 12.0, 1001)
-      )
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 30.0, 1002),
+          Seq(2, "a2", 12.0, 1001)
+        )
 
-      // the fourth merge (delete the record)
-      spark.sql(
-        s"""
-           | merge into $tableName
-           | using (
-           |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
-           | ) s0
-           | on s0.id = $tableName.id
-           | when matched and s0.id != 1 then update set
-           |    id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
-           | when matched and s0.id = 1 then delete
-           | when not matched then insert *
+        // the fourth merge (delete the record)
+        spark.sql(
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+             | ) s0
+             | on s0.id = $tableName.id
+             | when matched and s0.id != 1 then update set
+             |    id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+             | when matched and s0.id = 1 then delete
+             | when not matched then insert *
        """.stripMargin)
-      val cnt = spark.sql(s"select * from $tableName where id = 1").count()
-      assertResult(0)(cnt)
-    })
+        val cnt = spark.sql(s"select * from $tableName where id = 1").count()
+        assertResult(0)(cnt)
+      })
+    }
   }
 
   /**
@@ -1187,41 +1193,47 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto with partial insert") {
-    withRecordType()(withTempDir {tmp =>
-      spark.sql("set hoodie.payload.combined.schema.validate = true")
-      // Create a partitioned mor table
-      val tableName = generateTableName
-      spark.sql(
-        s"""
-           | create table $tableName (
-           |  id bigint,
-           |  name string,
-           |  price double,
-           |  dt string
-           | ) using hudi
-           | tblproperties (
-           |  type = 'mor',
-           |  primaryKey = 'id'
-           | )
-           | partitioned by(dt)
-           | location '${tmp.getCanonicalPath}'
+    Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+      withRecordType()(withTempDir { tmp =>
+        spark.sql("set hoodie.payload.combined.schema.validate = true")
+        // Create a partitioned mor table
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id bigint,
+             |  name string,
+             |  price double,
+             |  dt string
+             | ) using hudi
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id'
+             | )
+             | partitioned by(dt)
+             | location '${tmp.getCanonicalPath}'
          """.stripMargin)
 
-      spark.sql(s"insert into $tableName select 1, 'a1', 10, '2021-03-21'")
-      spark.sql(
-        s"""
-           | merge into $tableName as t0
-           | using (
-           |  select 2 as id, 'a2' as name, 10 as price, '2021-03-20' as dt
-           | ) s0
-           | on s0.id = t0.id
-           | when not matched and s0.id % 2 = 0 then insert (id, name, dt)
-           | values(s0.id, s0.name, s0.dt)
+        spark.sql(s"insert into $tableName select 1, 'a1', 10, '2021-03-21'")
+
+        // test with optimized sql merge enabled / disabled.
+        spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+
+        spark.sql(
+          s"""
+             | merge into $tableName as t0
+             | using (
+             |  select 2 as id, 'a2' as name, 10 as price, '2021-03-20' as dt
+             | ) s0
+             | on s0.id = t0.id
+             | when not matched and s0.id % 2 = 0 then insert (id, name, dt)
+             | values(s0.id, s0.name, s0.dt)
          """.stripMargin)
-      checkAnswer(s"select id, name, price, dt from $tableName order by id")(
-        Seq(1, "a1", 10, "2021-03-21"),
-        Seq(2, "a2", null, "2021-03-20")
-      )
-    })
+        checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+          Seq(1, "a1", 10, "2021-03-21"),
+          Seq(2, "a2", null, "2021-03-20")
+        )
+      })
+    }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
index 69a40e868ac..dd1d00580dc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -17,127 +17,134 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.{HoodieSparkUtils, ScalaAssertionSupport}
 
 class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase 
with ScalaAssertionSupport {
 
   test("Test Merge into extra cond") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName'
-           | tblproperties (
-           |  primaryKey ='id',
-           |  preCombineField = 'ts'
-           | )
+    Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName'
+             | tblproperties (
+             |  primaryKey ='id',
+             |  preCombineField = 'ts'
+             | )
        """.stripMargin)
-      val tableName2 = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName2 (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName2'
-           | tblproperties (
-           |  primaryKey ='id',
-           |  preCombineField = 'ts'
-           | )
+        val tableName2 = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName2 (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName2'
+             | tblproperties (
+             |  primaryKey ='id',
+             |  preCombineField = 'ts'
+             | )
        """.stripMargin)
 
-      spark.sql(
-        s"""
-           |insert into $tableName values
-           |    (1, 'a1', 10, 100),
-           |    (2, 'a2', 20, 200),
-           |    (3, 'a3', 20, 100)
-           |""".stripMargin)
-      spark.sql(
-        s"""
-           |insert into $tableName2 values
-           |    (1, 'u1', 10, 999),
-           |    (3, 'u3', 30, 9999),
-           |    (4, 'u4', 40, 99999)
-           |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName values
+             |    (1, 'a1', 10, 100),
+             |    (2, 'a2', 20, 200),
+             |    (3, 'a3', 20, 100)
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName2 values
+             |    (1, 'u1', 10, 999),
+             |    (3, 'u3', 30, 9999),
+             |    (4, 'u4', 40, 99999)
+             |""".stripMargin)
 
-      spark.sql(
-        s"""
-           |merge into $tableName as oldData
-           |using $tableName2
-           |on oldData.id = $tableName2.id
-           |when matched and oldData.price = $tableName2.price then update set 
oldData.name = $tableName2.name
-           |
-           |""".stripMargin)
+        // test with optimized sql merge enabled / disabled.
+        spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
 
-      checkAnswer(s"select id, name, price, ts from $tableName")(
-        Seq(1, "u1", 10.0, 100),
-        Seq(3, "a3", 20.0, 100),
-        Seq(2, "a2", 20.0, 200)
-      )
+        spark.sql(
+          s"""
+             |merge into $tableName as oldData
+             |using $tableName2
+             |on oldData.id = $tableName2.id
+             |when matched and oldData.price = $tableName2.price then update 
set oldData.name = $tableName2.name
+             |
+             |""".stripMargin)
 
-      val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
-        "Only simple conditions of the form `t.id = s.id` using primary key or 
partition path " +
-          "columns are allowed on tables with primary key. (illegal column(s) 
used: `price`"
-      } else {
-        "Only simple conditions of the form `t.id = s.id` using primary key or 
partition path " +
-          "columns are allowed on tables with primary key. (illegal column(s) 
used: `price`;"
-      }
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "u1", 10.0, 100),
+          Seq(3, "a3", 20.0, 100),
+          Seq(2, "a2", 20.0, 200)
+        )
 
-      checkException(
-        s"""
-           |merge into $tableName as oldData
-           |using $tableName2
-           |on oldData.id = $tableName2.id and oldData.price = 
$tableName2.price
-           |when matched then update set oldData.name = $tableName2.name
-           |when not matched then insert *
-           |""".stripMargin)(errorMessage)
+        val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
+          "Only simple conditions of the form `t.id = s.id` using primary key 
or partition path " +
+            "columns are allowed on tables with primary key. (illegal 
column(s) used: `price`"
+        } else {
+          "Only simple conditions of the form `t.id = s.id` using primary key 
or partition path " +
+            "columns are allowed on tables with primary key. (illegal 
column(s) used: `price`;"
+        }
 
-      //test with multiple pks
-      val tableName3 = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName3 (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName3'
-           | tblproperties (
-           |  primaryKey ='id,name',
-           |  preCombineField = 'ts'
-           | )
+        checkException(
+          s"""
+             |merge into $tableName as oldData
+             |using $tableName2
+             |on oldData.id = $tableName2.id and oldData.price = 
$tableName2.price
+             |when matched then update set oldData.name = $tableName2.name
+             |when not matched then insert *
+             |""".stripMargin)(errorMessage)
+
+        //test with multiple pks
+        val tableName3 = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName3 (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName3'
+             | tblproperties (
+             |  primaryKey ='id,name',
+             |  preCombineField = 'ts'
+             | )
        """.stripMargin)
 
-      val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
-        "Hudi tables with primary key are required to match on all primary key 
colums. Column: 'name' not found"
-      } else {
-        "Hudi tables with primary key are required to match on all primary key 
colums. Column: 'name' not found;"
-      }
+        val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+          "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found"
+        } else {
+          "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found;"
+        }
 
-      checkException(
-        s"""
-           |merge into $tableName3 as oldData
-           |using $tableName2
-           |on oldData.id = $tableName2.id
-           |when matched then update set oldData.name = $tableName2.name
-           |when not matched then insert *
-           |""".stripMargin)(errorMessage2)
+        checkException(
+          s"""
+             |merge into $tableName3 as oldData
+             |using $tableName2
+             |on oldData.id = $tableName2.id
+             |when matched then update set oldData.name = $tableName2.name
+             |when not matched then insert *
+             |""".stripMargin)(errorMessage2)
+      }
     }
   }
 
   test("Test pkless complex merge cond") {
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
+      spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
       val tableName = generateTableName
       // Create table
       spark.sql(
@@ -212,6 +219,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
     for (withPrecombine <- Seq(true, false)) {
       withRecordType()(withTempDir { tmp =>
         spark.sql("set hoodie.payload.combined.schema.validate = true")
+        spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
         val tableName = generateTableName
 
         val prekstr = if (withPrecombine) "tblproperties (preCombineField = 
'ts')" else ""
@@ -264,6 +272,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
   test("Test MergeInto Basic pkless") {
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
+      spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
       val tableName = generateTableName
       // Create table
       spark.sql(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index 23d192ba099..f244167d142 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.HoodieSparkUtils.isSpark2
 
 class TestUpdateTable extends HoodieSparkSqlTestBase {
 
   test("Test Update Table") {
     withRecordType()(withTempDir { tmp =>
-      Seq(true, false).foreach { optimizedSqlEnabled =>
+      Seq(true, false).foreach { sparkSqlOptimizedWrites =>
         Seq("cow", "mor").foreach { tableType =>
           val tableName = generateTableName
           // create table
@@ -50,7 +51,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
           )
 
           // test with optimized sql writes enabled / disabled.
-          spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+          spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
 
           // update data
           spark.sql(s"update $tableName set price = 20 where id = 1")
@@ -95,7 +96,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
         )
 
         // test with optimized sql writes enabled.
-        spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true")
+        spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
 
         // update data
         spark.sql(s"update $tableName set price = 20 where id = 1")
@@ -256,7 +257,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
 
   test("Test decimal type") {
     withTempDir { tmp =>
-      Seq(true, false).foreach { optimizedSqlEnabled =>
+      Seq(true, false).foreach { sparkSqlOptimizedWrites =>
         val tableName = generateTableName
         // create table
         spark.sql(
@@ -283,7 +284,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
         )
 
         // test with optimized sql writes enabled / disabled.
-        spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+        spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
 
         spark.sql(s"update $tableName set price = 22 where id = 1")
         checkAnswer(s"select id, name, price, ts from $tableName")(

Reply via email to