codope commented on code in PR #9123:
URL: https://github.com/apache/hudi/pull/9123#discussion_r1252918477
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -430,19 +430,23 @@ object DataSourceWriteOptions {
/**
* Enable the bulk insert for sql insert statement.
*/
+ @Deprecated
Review Comment:
please also mark `.deprecatedAfter("0.14.0")` here and for all below
deprecated configs
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -514,6 +520,29 @@ object DataSourceWriteOptions {
val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] =
HoodieCommonConfig.RECONCILE_SCHEMA
+ val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.sql.write.operation")
+ .defaultValue("insert")
+ .withDocumentation("Sql write operation to use with INSERT_INTO spark sql
command. This comes with 3 possible values, bulk_insert, " +
+ "insert and upsert. bulk_insert is generally meant for initial loads and
is known to be performant compared to insert. But bulk_insert may not " +
+ "do small file managmeent. If you prefer hudi to automatically managee
small files, then you can go with \"insert\". There is no precombine " +
+ "(if there are duplicates within the same batch being ingested, same
dups will be ingested) with bulk_insert and insert and there is no index " +
+ "look up as well. If you may use INSERT_INTO for mutable dataset, then
you may have to set this config value to \"upsert\". With upsert, you will " +
+ "get both precombine and updates to existing records on storage is also
honored. If not, you may see duplicates. ")
+
+ val NONE_INSERT_DUP_POLICY = "none"
+ val DROP_INSERT_DUP_POLICY = "drop"
+ val FAIL_INSERT_DUP_POLICY = "fail"
+
+ val INSERT_DUP_POLICY: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.insert.dup.policy")
Review Comment:
does `hoodie.datasource.insert.dedupe.policy` sounds better?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -514,6 +520,29 @@ object DataSourceWriteOptions {
val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] =
HoodieCommonConfig.RECONCILE_SCHEMA
+ val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.sql.write.operation")
+ .defaultValue("insert")
Review Comment:
can also add `.withValidValues("insert", "upsert", "bulk_insert")`.
i'm also thinking if we should allow delete as well? ideally, we would want
this to be similar to datasource write operations as much as possible.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -514,6 +520,29 @@ object DataSourceWriteOptions {
val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] =
HoodieCommonConfig.RECONCILE_SCHEMA
+ val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.sql.write.operation")
+ .defaultValue("insert")
+ .withDocumentation("Sql write operation to use with INSERT_INTO spark sql
command. This comes with 3 possible values, bulk_insert, " +
+ "insert and upsert. bulk_insert is generally meant for initial loads and
is known to be performant compared to insert. But bulk_insert may not " +
+ "do small file managmeent. If you prefer hudi to automatically managee
small files, then you can go with \"insert\". There is no precombine " +
+ "(if there are duplicates within the same batch being ingested, same
dups will be ingested) with bulk_insert and insert and there is no index " +
+ "look up as well. If you may use INSERT_INTO for mutable dataset, then
you may have to set this config value to \"upsert\". With upsert, you will " +
+ "get both precombine and updates to existing records on storage is also
honored. If not, you may see duplicates. ")
+
+ val NONE_INSERT_DUP_POLICY = "none"
+ val DROP_INSERT_DUP_POLICY = "drop"
+ val FAIL_INSERT_DUP_POLICY = "fail"
+
+ val INSERT_DUP_POLICY: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.insert.dup.policy")
+ .defaultValue(NONE_INSERT_DUP_POLICY)
Review Comment:
can also add `.withValidValues(NONE_INSERT_DUP_POLICY,
DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY)`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
val insertModeSet = insertModeOpt.nonEmpty
+ val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+ val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+ val sqlWriteOperation =
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+ val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(),
INSERT_DUP_POLICY.defaultValue())
val insertMode =
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty &&
hoodieCatalogTable.primaryKeys.nonEmpty
- // NOTE: Target operation could be overridden by the user, therefore if it
has been provided as an input
- // we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
+ // try to use sql write operation instead of legacy insert mode. If only
insert mode is explicitly specified, we will uze
+ // o
+ val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
Review Comment:
what is user has also set `DataSourceWriteOperations.OPERATION`? i think we
may have to reconcile with that.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
val insertModeSet = insertModeOpt.nonEmpty
+ val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+ val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+ val sqlWriteOperation =
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+ val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(),
INSERT_DUP_POLICY.defaultValue())
val insertMode =
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty &&
hoodieCatalogTable.primaryKeys.nonEmpty
- // NOTE: Target operation could be overridden by the user, therefore if it
has been provided as an input
- // we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
+ // try to use sql write operation instead of legacy insert mode. If only
insert mode is explicitly specified, we will uze
+ // o
Review Comment:
looks incomplete. please fix the comment.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -112,6 +113,36 @@ trait ProvidesHoodieConfig extends Logging {
}
}
+ private def deducePayloadClassNameLegacy(operation: String, tableType:
String, insertMode: InsertMode): String = {
+ if (operation == UPSERT_OPERATION_OPT_VAL &&
+ tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
+ // Validate duplicate key for COW, for MOR it will do the merge with the
DefaultHoodieRecordPayload
+ // on reading.
+ // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when
SparkRecordMerger is default
+ classOf[ValidateDuplicateKeyPayload].getCanonicalName
+ } else if (operation == INSERT_OPERATION_OPT_VAL && tableType ==
COW_TABLE_TYPE_OPT_VAL &&
+ insertMode == InsertMode.STRICT){
+ // Validate duplicate key for inserts to COW table when using strict
insert mode.
+ classOf[ValidateDuplicateKeyPayload].getCanonicalName
+ } else {
+ classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+ }
Review Comment:
We can consider that in a separate patch. This is just a refactoring to a
separate method.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1094,6 +1094,11 @@ object HoodieSparkSqlWriter {
if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY,
mergedParams(PRECOMBINE_FIELD.key()))
}
+ if (mergedParams.get(OPERATION.key()).get == INSERT_OPERATION_OPT_VAL &&
mergedParams.contains(DataSourceWriteOptions.INSERT_DUP_POLICY.key())
+ && mergedParams.get(DataSourceWriteOptions.INSERT_DUP_POLICY.key()).get
!= FAIL_INSERT_DUP_POLICY) {
+ // enable merge allow duplicates when operation type is insert
+
mergedParams.put(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(),
"true")
Review Comment:
i generally agree with this point but i think we want to keep the default
backwards compatible.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
val insertModeSet = insertModeOpt.nonEmpty
+ val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+ val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+ val sqlWriteOperation =
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+ val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(),
INSERT_DUP_POLICY.defaultValue())
val insertMode =
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty &&
hoodieCatalogTable.primaryKeys.nonEmpty
- // NOTE: Target operation could be overridden by the user, therefore if it
has been provided as an input
- // we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
+ // try to use sql write operation instead of legacy insert mode. If only
insert mode is explicitly specified, we will uze
+ // o
+ val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
val operation = combinedOpts.getOrElse(OPERATION.key,
+ if (useLegacyInsertModeFlow) {
+ // NOTE: Target operation could be overridden by the user, therefore
if it has been provided as an input
+ // we'd prefer that value over auto-deduced operation.
Otherwise, we deduce target operation type
deduceWriteOperationForInsertInfo(isPartitionedTable,
isOverwritePartition, isOverwriteTable, insertModeSet, dropDuplicate,
- enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert))
+ enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert)
+ } else {
+ deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable,
sqlWriteOperation)
+ }
+ )
- val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
- tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
- // Validate duplicate key for COW, for MOR it will do the merge with the
DefaultHoodieRecordPayload
- // on reading.
- // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when
SparkRecordMerger is default
- classOf[ValidateDuplicateKeyPayload].getCanonicalName
- } else if (operation == INSERT_OPERATION_OPT_VAL && tableType ==
COW_TABLE_TYPE_OPT_VAL &&
- insertMode == InsertMode.STRICT){
- // Validate duplicate key for inserts to COW table when using strict
insert mode.
- classOf[ValidateDuplicateKeyPayload].getCanonicalName
+ val payloadClassName = if (useLegacyInsertModeFlow) {
+ deducePayloadClassNameLegacy(operation, tableType, insertMode)
} else {
- classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+ // should we also consider old way of doing things.
Review Comment:
i think we should. we can change the behavior in 1.x. But, in 0.14.0, we
should map the previous config value to the new config value, e.g. `STRICT` is
equivalent to `FAIL_INSERT_DUP_POLICY`.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala:
##########
@@ -1499,4 +1500,341 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
assertEquals(3,
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
}
}
+
+ /**
+ * When neither of strict mode nor sql.write.operation is set, sql write
operation takes precedence and default value is chosen.
+ */
+ test("Test sql write operation with INSERT_INTO No explicit configs") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow","mor").foreach {tableType =>
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateData(tableType, tableName, tmp)
+ }
+ }
+ })
+ }
+
+ test("Test sql write operation with INSERT_INTO override both strict mode
and sql write operation") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow","mor").foreach { tableType =>
+ Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT,
WriteOperationType.UPSERT).foreach { operation =>
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateData(tableType, tableName, tmp, operation,
+ List("set hoodie.sql.write.operation = " + operation.value(),
"set hoodie.sql.insert.mode = upsert"))
+ }
+ }
+ }
+ })
+ }
+
+ test("Test sql write operation with INSERT_INTO override only sql write
operation") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow","mor").foreach {tableType =>
+ Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT,
WriteOperationType.UPSERT).foreach { operation =>
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateData(tableType, tableName, tmp, operation,
+ List("set hoodie.sql.write.operation = " + operation.value()))
+ }
+ }
+ }
+ })
+ }
+
+ test("Test sql write operation with INSERT_INTO override only strict mode") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow","mor").foreach {tableType =>
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateData(tableType, tableName, tmp,
WriteOperationType.UPSERT,
+ List("set hoodie.sql.insert.mode = upsert"))
+ }
+ }
+ })
+ }
+
+ def ingestAndValidateData(tableType: String, tableName: String, tmp: File,
+ expectedOperationtype: WriteOperationType =
WriteOperationType.INSERT,
+ setOptions: List[String] = List.empty) : Unit = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | dt string
+ |) using hudi
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombine = 'name'
+ | )
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ """.stripMargin)
+ setOptions.foreach(entry => {
+ spark.sql(entry)
+ })
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
+
+ assertResult(expectedOperationtype) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+ }
+ checkAnswer(s"select id, name, price, dt from $tableName")(
+ Seq(1, "a1", 10.0, "2021-07-18")
+ )
+
+ // insert record again but w/ diff values but same primary key. Since
"insert" is chosen as operation type,
+ // dups should be seen w/ snapshot query
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1_1', 10, "2021-07-18"),
+ | (2, 'a2', 20, "2021-07-18"),
+ | (2, 'a2_2', 30, "2021-07-18")
+ """.stripMargin)
+
+ assertResult(expectedOperationtype) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+ }
+ if (expectedOperationtype == WriteOperationType.UPSERT) {
+ // dedup should happen within same batch being ingested and existing
records on storage should get updated
+ checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+ Seq(1, "a1_1", 10.0, "2021-07-18"),
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
+ } else {
+ // no dedup across batches
+ checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ Seq(1, "a1_1", 10.0, "2021-07-18"),
+ // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same batch
kicks in if preCombine is set
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
+ }
+ spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
+ }
+
+ test("Test insert dup policy with INSERT_INTO explicit new configs INSERT
operation ") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow","mor").foreach {tableType =>
+ val operation = WriteOperationType.INSERT
+ Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach {
dupPolicy =>
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateDataDupPolicy(tableType, tableName, tmp,
operation,
+ List("set hoodie.sql.write.operation = " + operation.value(),
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+ dupPolicy)
+ }
+ }
+ }
+ })
+ }
+
+ test("Test insert dup policy with INSERT_INTO explicit new configs
BULK_INSERT operation ") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow").foreach {tableType =>
+ val operation = WriteOperationType.BULK_INSERT
+ val dupPolicy = NONE_INSERT_DUP_POLICY
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateDataDupPolicy(tableType, tableName, tmp,
operation,
+ List("set hoodie.sql.write.operation = " + operation.value(),
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+ dupPolicy)
+ }
+ }
+ })
+ }
+
+ test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK
INSERT operation") {
+ withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
+ Seq("cow").foreach {tableType =>
+ val operation = WriteOperationType.BULK_INSERT
+ val dupPolicy = DROP_INSERT_DUP_POLICY
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation,
+ List("set hoodie.sql.write.operation = " + operation.value(), "set
" + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+ dupPolicy)
+ }
+ }
+ })
+ }
+
+ test("Test FAIL insert dup policy with INSERT_INTO explicit new configs") {
+ withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
+ Seq("cow").foreach {tableType =>
+ val operation = WriteOperationType.UPSERT
+ val dupPolicy = FAIL_INSERT_DUP_POLICY
+ withTable(generateTableName) { tableName =>
+ ingestAndValidateDataDupPolicy(tableType, tableName, tmp,
operation,
+ List("set hoodie.sql.write.operation = " + operation.value(),
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+ dupPolicy, true)
+ }
+ }
+ })
+ }
+
+ def ingestAndValidateDataDupPolicy(tableType: String, tableName: String,
tmp: File,
+ expectedOperationtype: WriteOperationType =
WriteOperationType.INSERT,
+ setOptions: List[String] = List.empty,
insertDupPolicy : String = NONE_INSERT_DUP_POLICY,
+ expectExceptionOnSecondBatch: Boolean =
false) : Unit = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | dt string
+ |) using hudi
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombine = 'name'
+ | )
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ """.stripMargin)
+ // set additional options
+ setOptions.foreach(entry => {
+ spark.sql(entry)
+ })
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
+
+ assertResult(expectedOperationtype) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+ }
+ checkAnswer(s"select id, name, price, dt from $tableName")(
+ Seq(1, "a1", 10.0, "2021-07-18")
+ )
+
+ if (expectExceptionOnSecondBatch) {
+ assertThrows[HoodieDuplicateKeyException] {
+ try {
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1_1', 10, "2021-07-18"),
+ | (2, 'a2', 20, "2021-07-18"),
+ | (2, 'a2_2', 30, "2021-07-18")
+ """.stripMargin)
+ } catch {
+ case e: Exception =>
+ var root: Throwable = e
+ while (root.getCause != null) {
+ root = root.getCause
+ }
+ throw root
+ }
+ }
+ } else {
+
+ // insert record again but w/ diff values but same primary key. Since
"insert" is chosen as operation type,
+ // dups should be seen w/ snapshot query
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1_1', 10, "2021-07-18"),
+ | (2, 'a2', 20, "2021-07-18"),
+ | (2, 'a2_2', 30, "2021-07-18")
+ """.stripMargin)
+
+ assertResult(expectedOperationtype) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+ }
+
+ if (expectedOperationtype == WriteOperationType.UPSERT) {
+ // dedup should happen within same batch being ingested and existing
records on storage should get updated
+ checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+ Seq(1, "a1_1", 10.0, "2021-07-18"),
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
+ } else {
+ if (insertDupPolicy == NONE_INSERT_DUP_POLICY) {
+ // no dedup across batches
+ checkAnswer(s"select id, name, price, dt from $tableName order by
id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ Seq(1, "a1_1", 10.0, "2021-07-18"),
+ // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same
batch kicks in if preCombine is set
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
+ } else if (insertDupPolicy == DROP_INSERT_DUP_POLICY) {
+ checkAnswer(s"select id, name, price, dt from $tableName order by
id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ // Seq(2, "a2", 20.0, "2021-07-18"), // preCombine within same
batch kicks in if preCombine is set
+ Seq(2, "a2_2", 30.0, "2021-07-18")
+ )
+ }
+ }
+ }
+ spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
+ spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
+ }
+
+ def ingestAndValidateExceptionBulkInsertAndDropDupPolicy(tableType: String,
tableName: String, tmp: File,
Review Comment:
can remove this method. i don't see it being used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]