codope commented on code in PR #9123:
URL: https://github.com/apache/hudi/pull/9123#discussion_r1252918477


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -430,19 +430,23 @@ object DataSourceWriteOptions {
   /**
    * Enable the bulk insert for sql insert statement.
    */
+  @Deprecated

Review Comment:
   please also mark `.deprecatedAfter("0.14.0")` here and for all below 
deprecated configs



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -514,6 +520,29 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
+  val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.sql.write.operation")
+    .defaultValue("insert")
+    .withDocumentation("Sql write operation to use with INSERT_INTO spark sql 
command. This comes with 3 possible values, bulk_insert, " +
+      "insert and upsert. bulk_insert is generally meant for initial loads and 
is known to be performant compared to insert. But bulk_insert may not " +
+      "do small file managmeent. If you prefer hudi to automatically managee 
small files, then you can go with \"insert\". There is no precombine " +
+      "(if there are duplicates within the same batch being ingested, same 
dups will be ingested) with bulk_insert and insert and there is no index " +
+      "look up as well. If you may use INSERT_INTO for mutable dataset, then 
you may have to set this config value to \"upsert\". With upsert, you will " +
+      "get both precombine and updates to existing records on storage is also 
honored. If not, you may see duplicates. ")
+
+  val NONE_INSERT_DUP_POLICY = "none"
+  val DROP_INSERT_DUP_POLICY = "drop"
+  val FAIL_INSERT_DUP_POLICY = "fail"
+
+  val INSERT_DUP_POLICY: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.insert.dup.policy")

Review Comment:
   does `hoodie.datasource.insert.dedupe.policy` sounds better?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -514,6 +520,29 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
+  val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.sql.write.operation")
+    .defaultValue("insert")

Review Comment:
   can also add `.withValidValues("insert", "upsert", "bulk_insert")`.
   i'm also thinking if we should allow delete as well? ideally, we would want 
this to be similar to datasource write operations as much as possible.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -514,6 +520,29 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
+  val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.sql.write.operation")
+    .defaultValue("insert")
+    .withDocumentation("Sql write operation to use with INSERT_INTO spark sql 
command. This comes with 3 possible values, bulk_insert, " +
+      "insert and upsert. bulk_insert is generally meant for initial loads and 
is known to be performant compared to insert. But bulk_insert may not " +
+      "do small file managmeent. If you prefer hudi to automatically managee 
small files, then you can go with \"insert\". There is no precombine " +
+      "(if there are duplicates within the same batch being ingested, same 
dups will be ingested) with bulk_insert and insert and there is no index " +
+      "look up as well. If you may use INSERT_INTO for mutable dataset, then 
you may have to set this config value to \"upsert\". With upsert, you will " +
+      "get both precombine and updates to existing records on storage is also 
honored. If not, you may see duplicates. ")
+
+  val NONE_INSERT_DUP_POLICY = "none"
+  val DROP_INSERT_DUP_POLICY = "drop"
+  val FAIL_INSERT_DUP_POLICY = "fail"
+
+  val INSERT_DUP_POLICY: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.insert.dup.policy")
+    .defaultValue(NONE_INSERT_DUP_POLICY)

Review Comment:
   can also add `.withValidValues(NONE_INSERT_DUP_POLICY, 
DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY)`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
 
     val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
     val insertModeSet = insertModeOpt.nonEmpty
+    val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+    val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+    val sqlWriteOperation = 
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+    val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), 
INSERT_DUP_POLICY.defaultValue())
     val insertMode = 
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && 
hoodieCatalogTable.primaryKeys.nonEmpty
 
-    // NOTE: Target operation could be overridden by the user, therefore if it 
has been provided as an input
-    //       we'd prefer that value over auto-deduced operation. Otherwise, we 
deduce target operation type
+    // try to use sql write operation instead of legacy insert mode. If only 
insert mode is explicitly specified, we will uze
+    // o
+    val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet

Review Comment:
   what is user has also set `DataSourceWriteOperations.OPERATION`? i think we 
may have to reconcile with that.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
 
     val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
     val insertModeSet = insertModeOpt.nonEmpty
+    val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+    val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+    val sqlWriteOperation = 
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+    val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), 
INSERT_DUP_POLICY.defaultValue())
     val insertMode = 
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && 
hoodieCatalogTable.primaryKeys.nonEmpty
 
-    // NOTE: Target operation could be overridden by the user, therefore if it 
has been provided as an input
-    //       we'd prefer that value over auto-deduced operation. Otherwise, we 
deduce target operation type
+    // try to use sql write operation instead of legacy insert mode. If only 
insert mode is explicitly specified, we will uze
+    // o

Review Comment:
   looks incomplete. please fix the comment.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -112,6 +113,36 @@ trait ProvidesHoodieConfig extends Logging {
     }
   }
 
+  private def deducePayloadClassNameLegacy(operation: String, tableType: 
String, insertMode: InsertMode): String = {
+    if (operation == UPSERT_OPERATION_OPT_VAL &&
+      tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
+      // Validate duplicate key for COW, for MOR it will do the merge with the 
DefaultHoodieRecordPayload
+      // on reading.
+      // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when 
SparkRecordMerger is default
+      classOf[ValidateDuplicateKeyPayload].getCanonicalName
+    } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == 
COW_TABLE_TYPE_OPT_VAL &&
+      insertMode == InsertMode.STRICT){
+      // Validate duplicate key for inserts to COW table when using strict 
insert mode.
+      classOf[ValidateDuplicateKeyPayload].getCanonicalName
+    } else {
+      classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+    }

Review Comment:
   We can consider that in a separate patch. This is just a refactoring to a 
separate method.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1094,6 +1094,11 @@ object HoodieSparkSqlWriter {
     if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
       mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
mergedParams(PRECOMBINE_FIELD.key()))
     }
+    if (mergedParams.get(OPERATION.key()).get == INSERT_OPERATION_OPT_VAL && 
mergedParams.contains(DataSourceWriteOptions.INSERT_DUP_POLICY.key())
+      && mergedParams.get(DataSourceWriteOptions.INSERT_DUP_POLICY.key()).get 
!= FAIL_INSERT_DUP_POLICY) {
+      // enable merge allow duplicates when operation type is insert
+      
mergedParams.put(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(),
 "true")

Review Comment:
   i generally agree with this point but i think we want to keep the default 
backwards compatible.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
 
     val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
     val insertModeSet = insertModeOpt.nonEmpty
+    val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+    val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+    val sqlWriteOperation = 
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+    val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(), 
INSERT_DUP_POLICY.defaultValue())
     val insertMode = 
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty && 
hoodieCatalogTable.primaryKeys.nonEmpty
 
-    // NOTE: Target operation could be overridden by the user, therefore if it 
has been provided as an input
-    //       we'd prefer that value over auto-deduced operation. Otherwise, we 
deduce target operation type
+    // try to use sql write operation instead of legacy insert mode. If only 
insert mode is explicitly specified, we will uze
+    // o
+    val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
     val operation = combinedOpts.getOrElse(OPERATION.key,
+      if (useLegacyInsertModeFlow) {
+        // NOTE: Target operation could be overridden by the user, therefore 
if it has been provided as an input
+        //       we'd prefer that value over auto-deduced operation. 
Otherwise, we deduce target operation type
       deduceWriteOperationForInsertInfo(isPartitionedTable, 
isOverwritePartition, isOverwriteTable, insertModeSet, dropDuplicate,
-        enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert))
+        enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert)
+      } else {
+        deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable, 
sqlWriteOperation)
+      }
+    )
 
-    val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
-      tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
-      // Validate duplicate key for COW, for MOR it will do the merge with the 
DefaultHoodieRecordPayload
-      // on reading.
-      // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when 
SparkRecordMerger is default
-      classOf[ValidateDuplicateKeyPayload].getCanonicalName
-    } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == 
COW_TABLE_TYPE_OPT_VAL &&
-      insertMode == InsertMode.STRICT){
-      // Validate duplicate key for inserts to COW table when using strict 
insert mode.
-      classOf[ValidateDuplicateKeyPayload].getCanonicalName
+    val payloadClassName =  if (useLegacyInsertModeFlow) {
+      deducePayloadClassNameLegacy(operation, tableType, insertMode)
     } else {
-      classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+      // should we also consider old way of doing things.

Review Comment:
   i think we should. we can change the behavior in 1.x. But, in 0.14.0, we 
should map the previous config value to the new config value, e.g. `STRICT` is 
equivalent to `FAIL_INSERT_DUP_POLICY`.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala:
##########
@@ -1499,4 +1500,341 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       assertEquals(3, 
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
     }
   }
+
+  /**
+   * When neither of strict mode nor sql.write.operation is set, sql write 
operation takes precedence and default value is chosen.
+   */
+  test("Test sql write operation with INSERT_INTO No explicit configs") {
+      withRecordType()(withTempDir { tmp =>
+        Seq("cow","mor").foreach {tableType =>
+          withTable(generateTableName) { tableName =>
+            ingestAndValidateData(tableType, tableName, tmp)
+          }
+        }
+      })
+  }
+
+  test("Test sql write operation with INSERT_INTO override both strict mode 
and sql write operation") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow","mor").foreach { tableType =>
+        Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.UPSERT).foreach { operation =>
+          withTable(generateTableName) { tableName =>
+            ingestAndValidateData(tableType, tableName, tmp, operation,
+              List("set hoodie.sql.write.operation = " + operation.value(), 
"set hoodie.sql.insert.mode = upsert"))
+          }
+        }
+      }
+    })
+  }
+
+  test("Test sql write operation with INSERT_INTO override only sql write 
operation") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow","mor").foreach {tableType =>
+        Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.UPSERT).foreach { operation =>
+          withTable(generateTableName) { tableName =>
+            ingestAndValidateData(tableType, tableName, tmp, operation,
+              List("set hoodie.sql.write.operation = " + operation.value()))
+          }
+        }
+      }
+    })
+  }
+
+  test("Test sql write operation with INSERT_INTO override only strict mode") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow","mor").foreach {tableType =>
+        withTable(generateTableName) { tableName =>
+          ingestAndValidateData(tableType, tableName, tmp, 
WriteOperationType.UPSERT,
+            List("set hoodie.sql.insert.mode = upsert"))
+        }
+      }
+    })
+  }
+
+  def ingestAndValidateData(tableType: String, tableName: String, tmp: File,
+                            expectedOperationtype: WriteOperationType = 
WriteOperationType.INSERT,
+                            setOptions: List[String] = List.empty) : Unit = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  dt string
+         |) using hudi
+         | tblproperties (
+         |  type = '$tableType',
+         |  primaryKey = 'id',
+         |  preCombine = 'name'
+         | )
+         | partitioned by (dt)
+         | location '${tmp.getCanonicalPath}/$tableName'
+         """.stripMargin)
+    setOptions.foreach(entry => {
+      spark.sql(entry)
+    })
+
+    spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
+
+    assertResult(expectedOperationtype) {
+      getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+    }
+    checkAnswer(s"select id, name, price, dt from $tableName")(
+      Seq(1, "a1", 10.0, "2021-07-18")
+    )
+
+    // insert record again but w/ diff values but same primary key. Since 
"insert" is chosen as operation type,
+    // dups should be seen w/ snapshot query
+    spark.sql(
+      s"""
+         | insert into $tableName values
+         | (1, 'a1_1', 10, "2021-07-18"),
+         | (2, 'a2', 20, "2021-07-18"),
+         | (2, 'a2_2', 30, "2021-07-18")
+              """.stripMargin)
+
+    assertResult(expectedOperationtype) {
+      getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+    }
+    if (expectedOperationtype == WriteOperationType.UPSERT) {
+      // dedup should happen within same batch being ingested and existing 
records on storage should get updated
+      checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+        Seq(1, "a1_1", 10.0, "2021-07-18"),
+        Seq(2, "a2_2", 30.0, "2021-07-18")
+      )
+    } else {
+      // no dedup across batches
+      checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+        Seq(1, "a1", 10.0, "2021-07-18"),
+        Seq(1, "a1_1", 10.0, "2021-07-18"),
+        // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same batch 
kicks in if preCombine is set
+        Seq(2, "a2_2", 30.0, "2021-07-18")
+      )
+    }
+    spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+    spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
+  }
+
+  test("Test insert dup policy with INSERT_INTO explicit new configs INSERT 
operation ") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow","mor").foreach {tableType =>
+        val operation = WriteOperationType.INSERT
+          Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach { 
dupPolicy =>
+          withTable(generateTableName) { tableName =>
+            ingestAndValidateDataDupPolicy(tableType, tableName, tmp, 
operation,
+              List("set hoodie.sql.write.operation = " + operation.value(), 
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+              dupPolicy)
+          }
+        }
+      }
+    })
+  }
+
+  test("Test insert dup policy with INSERT_INTO explicit new configs 
BULK_INSERT operation ") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow").foreach {tableType =>
+        val operation = WriteOperationType.BULK_INSERT
+        val dupPolicy = NONE_INSERT_DUP_POLICY
+            withTable(generateTableName) { tableName =>
+              ingestAndValidateDataDupPolicy(tableType, tableName, tmp, 
operation,
+                List("set hoodie.sql.write.operation = " + operation.value(), 
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+                dupPolicy)
+            }
+      }
+    })
+  }
+
+  test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK 
INSERT operation") {
+    withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
+      Seq("cow").foreach {tableType =>
+        val operation = WriteOperationType.BULK_INSERT
+        val dupPolicy = DROP_INSERT_DUP_POLICY
+        withTable(generateTableName) { tableName =>
+          ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation,
+            List("set hoodie.sql.write.operation = " + operation.value(), "set 
" + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+            dupPolicy)
+        }
+      }
+    })
+  }
+
+  test("Test FAIL insert dup policy with INSERT_INTO explicit new configs") {
+    withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
+      Seq("cow").foreach {tableType =>
+        val operation = WriteOperationType.UPSERT
+        val dupPolicy = FAIL_INSERT_DUP_POLICY
+            withTable(generateTableName) { tableName =>
+              ingestAndValidateDataDupPolicy(tableType, tableName, tmp, 
operation,
+                List("set hoodie.sql.write.operation = " + operation.value(), 
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+                dupPolicy, true)
+            }
+          }
+    })
+  }
+
+  def ingestAndValidateDataDupPolicy(tableType: String, tableName: String, 
tmp: File,
+                            expectedOperationtype: WriteOperationType = 
WriteOperationType.INSERT,
+                            setOptions: List[String] = List.empty, 
insertDupPolicy : String = NONE_INSERT_DUP_POLICY,
+                                    expectExceptionOnSecondBatch: Boolean = 
false) : Unit = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  dt string
+         |) using hudi
+         | tblproperties (
+         |  type = '$tableType',
+         |  primaryKey = 'id',
+         |  preCombine = 'name'
+         | )
+         | partitioned by (dt)
+         | location '${tmp.getCanonicalPath}/$tableName'
+         """.stripMargin)
+    // set additional options
+    setOptions.foreach(entry => {
+      spark.sql(entry)
+    })
+
+    spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
+
+    assertResult(expectedOperationtype) {
+      getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+    }
+    checkAnswer(s"select id, name, price, dt from $tableName")(
+      Seq(1, "a1", 10.0, "2021-07-18")
+    )
+
+    if (expectExceptionOnSecondBatch) {
+      assertThrows[HoodieDuplicateKeyException] {
+        try {
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1_1', 10, "2021-07-18"),
+               | (2, 'a2', 20, "2021-07-18"),
+               | (2, 'a2_2', 30, "2021-07-18")
+              """.stripMargin)
+        } catch {
+          case e: Exception =>
+            var root: Throwable = e
+            while (root.getCause != null) {
+              root = root.getCause
+            }
+            throw root
+        }
+      }
+    } else {
+
+      // insert record again but w/ diff values but same primary key. Since 
"insert" is chosen as operation type,
+      // dups should be seen w/ snapshot query
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1_1', 10, "2021-07-18"),
+           | (2, 'a2', 20, "2021-07-18"),
+           | (2, 'a2_2', 30, "2021-07-18")
+              """.stripMargin)
+
+      assertResult(expectedOperationtype) {
+        getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+      }
+
+      if (expectedOperationtype == WriteOperationType.UPSERT) {
+        // dedup should happen within same batch being ingested and existing 
records on storage should get updated
+        checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+          Seq(1, "a1_1", 10.0, "2021-07-18"),
+          Seq(2, "a2_2", 30.0, "2021-07-18")
+        )
+      } else {
+        if (insertDupPolicy == NONE_INSERT_DUP_POLICY) {
+          // no dedup across batches
+          checkAnswer(s"select id, name, price, dt from $tableName order by 
id")(
+            Seq(1, "a1", 10.0, "2021-07-18"),
+            Seq(1, "a1_1", 10.0, "2021-07-18"),
+            // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same 
batch kicks in if preCombine is set
+            Seq(2, "a2_2", 30.0, "2021-07-18")
+          )
+        } else if (insertDupPolicy == DROP_INSERT_DUP_POLICY) {
+          checkAnswer(s"select id, name, price, dt from $tableName order by 
id")(
+            Seq(1, "a1", 10.0, "2021-07-18"),
+            // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same 
batch kicks in if preCombine is set
+            Seq(2, "a2_2", 30.0, "2021-07-18")
+          )
+        }
+      }
+    }
+    spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+    spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
+    spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
+  }
+
+  def ingestAndValidateExceptionBulkInsertAndDropDupPolicy(tableType: String, 
tableName: String, tmp: File,

Review Comment:
   can remove this method. i don't see it being used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to