This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2fd72c9c2f5 [HUDI-6639] Rename hoodie.sql.write.operation to
hoodie.spark.sql.insert.into.operation (#9359)
2fd72c9c2f5 is described below
commit 2fd72c9c2f5d65bbe8aec20f425815e580773f66
Author: Amrish Lal <[email protected]>
AuthorDate: Fri Aug 4 22:19:01 2023 -0700
[HUDI-6639] Rename hoodie.sql.write.operation to
hoodie.spark.sql.insert.into.operation (#9359)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 14 ++++++------
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 10 ++++-----
.../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 5 +++--
.../apache/spark/sql/hudi/TestCreateTable.scala | 4 ++--
.../sql/hudi/TestHoodieTableValuedFunction.scala | 5 +++--
.../apache/spark/sql/hudi/TestInsertTable.scala | 26 +++++++++++-----------
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 18 +++++++--------
7 files changed, 42 insertions(+), 40 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index e8c247a7c03..59c2a60a3ad 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -441,7 +441,7 @@ object DataSourceWriteOptions {
.markAdvanced()
.deprecatedAfter("0.14.0")
.withDocumentation("When set to true, the sql insert statement will use
bulk insert. " +
- "This config is deprecated as of 0.14.0. Please use
hoodie.sql.write.operation instead.")
+ "This config is deprecated as of 0.14.0. Please use
hoodie.spark.sql.insert.into.operation instead.")
@Deprecated
val SQL_INSERT_MODE: ConfigProperty[String] = ConfigProperty
@@ -452,7 +452,7 @@ object DataSourceWriteOptions {
"For upsert mode, insert statement do the upsert operation for the
pk-table which will update the duplicate record." +
"For strict mode, insert statement will keep the primary key uniqueness
constraint which do not allow duplicate record." +
"While for non-strict mode, hudi just do the insert operation for the
pk-table. This config is deprecated as of 0.14.0. Please use " +
- "hoodie.sql.write.operation and hoodie.datasource.insert.dup.policy as
you see fit.")
+ "hoodie.spark.sql.insert.into.operation and
hoodie.datasource.insert.dup.policy as you see fit.")
val COMMIT_METADATA_KEYPREFIX: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.commitmeta.key.prefix")
@@ -528,13 +528,13 @@ object DataSourceWriteOptions {
val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] =
HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE
- val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
- .key("hoodie.sql.write.operation")
- .defaultValue("insert")
- .withValidValues("bulk_insert","insert","upsert")
+ val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.spark.sql.insert.into.operation")
+ .defaultValue(WriteOperationType.INSERT.value())
+ .withValidValues(WriteOperationType.BULK_INSERT.value(),
WriteOperationType.INSERT.value(), WriteOperationType.UPSERT.value())
.withDocumentation("Sql write operation to use with INSERT_INTO spark sql
command. This comes with 3 possible values, bulk_insert, " +
"insert and upsert. bulk_insert is generally meant for initial loads and
is known to be performant compared to insert. But bulk_insert may not " +
- "do small file managmeent. If you prefer hudi to automatically managee
small files, then you can go with \"insert\". There is no precombine " +
+ "do small file management. If you prefer hudi to automatically manage
small files, then you can go with \"insert\". There is no precombine " +
"(if there are duplicates within the same batch being ingested, same
dups will be ingested) with bulk_insert and insert and there is no index " +
"look up as well. If you may use INSERT_INTO for mutable dataset, then
you may have to set this config value to \"upsert\". With upsert, you will " +
"get both precombine and updates to existing records on storage is also
honored. If not, you may see duplicates. ")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index c66dcc19549..f85032790dd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -107,8 +107,8 @@ trait ProvidesHoodieConfig extends Logging {
/**
* Deduce the sql write operation for INSERT_INTO
*/
- private def deduceSqlWriteOperation(isOverwritePartition: Boolean,
isOverwriteTable: Boolean,
- sqlWriteOperation: String): String = {
+ private def deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition:
Boolean, isOverwriteTable: Boolean,
+ sqlWriteOperation:
String): String = {
if (isOverwriteTable) {
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
} else if (isOverwritePartition) {
@@ -198,9 +198,9 @@ trait ProvidesHoodieConfig extends Logging {
val insertMode =
InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val insertModeSet = combinedOpts.contains(SQL_INSERT_MODE.key)
- val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+ val sqlWriteOperationOpt =
combinedOpts.get(SPARK_SQL_INSERT_INTO_OPERATION.key())
val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
- val sqlWriteOperation =
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+ val sqlWriteOperation =
sqlWriteOperationOpt.getOrElse(SPARK_SQL_INSERT_INTO_OPERATION.defaultValue())
val insertDupPolicyOpt = combinedOpts.get(INSERT_DUP_POLICY.key())
val insertDupPolicySet = insertDupPolicyOpt.nonEmpty
val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(),
INSERT_DUP_POLICY.defaultValue())
@@ -220,7 +220,7 @@ trait ProvidesHoodieConfig extends Logging {
deduceOperation(enableBulkInsert, isOverwritePartition,
isOverwriteTable, dropDuplicate,
isNonStrictMode, isPartitionedTable, combineBeforeInsert,
insertMode, autoGenerateRecordKeys)
} else {
- deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable,
sqlWriteOperation)
+ deduceSparkSqlInsertIntoWriteOperation(isOverwritePartition,
isOverwriteTable, sqlWriteOperation)
}
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index e78c3261ea6..a799ce8f787 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceReadOptions._
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
import org.apache.hudi.common.table.HoodieTableMetaClient
import
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{DATA_BEFORE,
DATA_BEFORE_AFTER, OP_KEY_ONLY}
import org.apache.spark.sql.DataFrame
@@ -102,7 +103,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
- spark.sql("set hoodie.sql.write.operation=upsert")
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
val otherTableProperties = if (tableType == "mor") {
"'hoodie.compact.inline'='true',
'hoodie.compact.inline.max.delta.commits'='2',"
} else {
@@ -216,7 +217,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
}
}
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index fdf552be655..e95c5cc0f03 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -455,7 +455,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
withTempDir { tmp =>
val parentPath = tmp.getCanonicalPath
val tableName1 = generateTableName
- spark.sql("set hoodie.sql.write.operation=upsert")
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql(
s"""
|create table $tableName1 (
@@ -514,7 +514,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
Seq(1, "a2", 1100)
)
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
}
test("Test Create ro/rt Table In The Wrong Way") {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
index 98ec634a898..1809a7e2f44 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.functions.{col, from_json}
@@ -27,7 +28,7 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
- spark.sql("set hoodie.sql.write.operation=upsert")
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql(
s"""
|create table $tableName (
@@ -81,7 +82,7 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
}
}
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
}
test(s"Test hudi_table_changes latest_state") {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index f80906e9d7b..5822a1ca679 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -833,7 +833,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test bulk insert with insert overwrite table") {
- withSQLConf("hoodie.sql.write.operation" -> "bulk_insert") {
+ withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key ->
WriteOperationType.BULK_INSERT.value()) {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
withTable(generateTableName) { nonPartitionedTable =>
@@ -866,7 +866,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test bulk insert with insert overwrite partition") {
- withSQLConf("hoodie.sql.write.operation" -> "bulk_insert") {
+ withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key ->
WriteOperationType.BULK_INSERT.value()) {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
withTable(generateTableName) { partitionedTable =>
@@ -1674,7 +1674,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
* When neither of strict mode nor sql.write.operation is set, sql write
operation takes precedence and default value is chosen.
*/
test("Test sql write operation with INSERT_INTO No explicit configs") {
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
@@ -1693,7 +1693,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT,
WriteOperationType.UPSERT).foreach { operation =>
withTable(generateTableName) { tableName =>
ingestAndValidateData(tableType, tableName, tmp, operation,
- List("set hoodie.sql.write.operation = " + operation.value(),
"set hoodie.sql.insert.mode = upsert"))
+ List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " +
operation.value(), "set hoodie.sql.insert.mode = upsert"))
}
}
}
@@ -1706,7 +1706,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT,
WriteOperationType.UPSERT).foreach { operation =>
withTable(generateTableName) { tableName =>
ingestAndValidateData(tableType, tableName, tmp, operation,
- List("set hoodie.sql.write.operation = " + operation.value()))
+ List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " +
operation.value()))
}
}
}
@@ -1714,7 +1714,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test sql write operation with INSERT_INTO override only strict mode") {
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
@@ -1789,7 +1789,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(2, "a2_2", 30.0, "2021-07-18")
)
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
@@ -1802,7 +1802,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach {
dupPolicy =>
withTable(generateTableName) { tableName =>
ingestAndValidateDataDupPolicy(tableType, tableName, tmp,
operation,
- List("set hoodie.sql.write.operation = " + operation.value(),
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+ List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " +
operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + "
= " + dupPolicy),
dupPolicy)
}
}
@@ -1817,7 +1817,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
val dupPolicy = NONE_INSERT_DUP_POLICY
withTable(generateTableName) { tableName =>
ingestAndValidateDataDupPolicy(tableType, tableName, tmp,
operation,
- List("set hoodie.sql.write.operation = " + operation.value(),
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+ List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " +
operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + "
= " + dupPolicy),
dupPolicy)
}
}
@@ -1831,7 +1831,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
val dupPolicy = DROP_INSERT_DUP_POLICY
withTable(generateTableName) { tableName =>
ingestAndValidateDropDupPolicyBulkInsert(tableType, tableName, tmp,
operation,
- List("set hoodie.sql.write.operation = " + operation.value(),
+ List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " +
operation.value(),
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = "
+ dupPolicy))
}
}
@@ -1845,7 +1845,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
val dupPolicy = FAIL_INSERT_DUP_POLICY
withTable(generateTableName) { tableName =>
ingestAndValidateDataDupPolicy(tableType, tableName, tmp,
operation,
- List("set hoodie.sql.write.operation = " + operation.value(),
"set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + " = " + dupPolicy),
+ List("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + " = " +
operation.value(), "set " + DataSourceWriteOptions.INSERT_DUP_POLICY.key() + "
= " + dupPolicy),
dupPolicy, true)
}
}
@@ -1947,7 +1947,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
@@ -1991,7 +1991,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
throw root
}
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 1858d72663e..0b2b01cbec9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.Path
-import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY,
PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY, TABLE_NAME}
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY,
PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY,
SPARK_SQL_INSERT_INTO_OPERATION, TABLE_NAME}
import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList,
getQuickstartWriteConfigs}
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieRecord
@@ -74,7 +74,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql("set hoodie.sql.write.operation=upsert")
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql("set hoodie.schema.on.read.enable=true")
// NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
// and are disallowed now by default in Spark 3.x
@@ -135,7 +135,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
)
spark.sessionState.catalog.dropTable(TableIdentifier(tableName),
true, true)
spark.sessionState.catalog.refreshTable(TableIdentifier(tableName))
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+
spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
}
}
})
@@ -237,7 +237,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql("set hoodie.sql.write.operation=upsert")
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
// NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
// and are disallowed now by default in Spark 3.x
spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
@@ -330,7 +330,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 6 or id = 2 or id = 11 order by id").show(false)
}
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
})
}
@@ -341,7 +341,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql("set hoodie.sql.write.operation=upsert")
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql(
s"""
|create table $tableName (
@@ -382,7 +382,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
)
}
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
})
}
@@ -522,7 +522,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql("set hoodie.sql.write.operation=upsert")
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql(
s"""
|create table $tableName (
@@ -601,7 +601,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
)
}
}
- spark.sessionState.conf.unsetConf("hoodie.sql.write.operation")
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
})
}