This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 67af016f21f [HUDI-6469] Revert HUDI-6311 (#9115)
67af016f21f is described below
commit 67af016f21fe77b1df89827400a1e14a960c7258
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Jul 5 10:19:01 2023 -0400
[HUDI-6469] Revert HUDI-6311 (#9115)
#8875 changed the behavior of spark-sql insert into command to
use the bulk insert row writer path. This PR reverts that change as
we plan to address the issue more holistically by synergising configs
between spark-sql and spark datasource wrt write operation.
---------
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 111 +--
.../command/InsertIntoHoodieTableCommand.scala | 6 +-
.../src/test/resources/sql-statements.sql | 5 -
.../hudi/functional/TestInsertIntoOperation.scala | 3 +
.../sql/hudi/TestAlterTableDropPartition.scala | 40 +-
.../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 6 +-
.../apache/spark/sql/hudi/TestCreateTable.scala | 9 +-
.../sql/hudi/TestHoodieTableValuedFunction.scala | 15 +-
.../apache/spark/sql/hudi/TestInsertTable.scala | 2 +-
.../spark/sql/hudi/TestMergeIntoLogOnlyTable.scala | 9 +-
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 832 ++++++++++-----------
.../spark/sql/hudi/TestTimeTravelTable.scala | 12 +-
.../TestHoodiePruneFileSourcePartitions.scala | 22 +-
.../sql/hudi/procedure/TestCleanProcedure.scala | 23 +-
.../hudi/procedure/TestClusteringProcedure.scala | 19 +-
.../sql/hudi/procedure/TestCommitsProcedure.scala | 34 +-
.../hudi/procedure/TestCompactionProcedure.scala | 23 +-
17 files changed, 538 insertions(+), 633 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index b68bad4d0f5..d9566f460c7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -85,64 +85,6 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
- /**
- * Get the insert operation.
- * See if we are able to set bulk insert, else use deduceOperation
- */
- private def deduceWriteOperationForInsertInfo(isPartitionedTable: Boolean,
- isOverwritePartition: Boolean,
- isOverwriteTable: Boolean,
- insertModeSet: Boolean,
- dropDuplicate: Option[String],
- enableBulkInsert:
Option[String],
- isInsertInto: Boolean,
- isNonStrictMode: Boolean,
- combineBeforeInsert: Boolean):
String = {
- val canEnforceNonStrictMode = !insertModeSet || isNonStrictMode
- //if selected configs are not set, instead of using the default we assume
the values to be those that enable bulk_insert
- (isInsertInto, canEnforceNonStrictMode, enableBulkInsert.getOrElse("true"),
- dropDuplicate.getOrElse("false"), isOverwritePartition,
isPartitionedTable) match {
- case (true, true, "true", "false", false, _) =>
BULK_INSERT_OPERATION_OPT_VAL
- case (true, true, "true", "false", true, false) =>
BULK_INSERT_OPERATION_OPT_VAL
-
- //if config is set such that we cant make it bulk insert, we need to use
defaults for unset configs
- case _ =>
deduceOperation(enableBulkInsert.getOrElse(SQL_ENABLE_BULK_INSERT.defaultValue).toBoolean,
- isOverwritePartition, isOverwriteTable,
dropDuplicate.getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean,
- isNonStrictMode, isPartitionedTable, combineBeforeInsert)
- }
- }
-
- /**
- * Deduce the insert operation
- */
- private def deduceOperation(enableBulkInsert: Boolean, isOverwritePartition:
Boolean, isOverwriteTable: Boolean,
- dropDuplicate: Boolean, isNonStrictMode:
Boolean, isPartitionedTable: Boolean,
- hasPrecombineColumn: Boolean): String = {
- (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate,
isNonStrictMode, isPartitionedTable) match {
- case (true, _, _, _, false, _) =>
- throw new IllegalArgumentException(s"Table with primaryKey can only
use bulk insert in non-strict mode.")
- case (true, _, _, true, _, _) =>
- throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
- s" Please disable $INSERT_DROP_DUPS and try again.")
- // Bulk insert with overwrite table
- case (true, false, true, _, _, _) =>
- BULK_INSERT_OPERATION_OPT_VAL
- // Bulk insert with overwrite table partition
- case (true, true, false, _, _, true) =>
- BULK_INSERT_OPERATION_OPT_VAL
- // insert overwrite table
- case (false, false, true, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
- // insert overwrite partition
- case (false, true, false, _, _, true) =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
- // disable dropDuplicate, and provide preCombineKey, use the upsert
operation for strict and upsert mode.
- case (false, false, false, false, false, _) if hasPrecombineColumn =>
UPSERT_OPERATION_OPT_VAL
- // if table is pk table and has enableBulkInsert use bulk insert for
non-strict mode.
- case (true, false, false, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
- // for the rest case, use the insert operation
- case _ => INSERT_OPERATION_OPT_VAL
- }
- }
-
/**
* Build the default config for insert.
*
@@ -153,8 +95,7 @@ trait ProvidesHoodieConfig extends Logging {
isOverwritePartition: Boolean,
isOverwriteTable: Boolean,
insertPartitions: Map[String, Option[String]] =
Map.empty,
- extraOptions: Map[String, String],
- isInsertInto: Boolean = false): Map[String,
String] = {
+ extraOptions: Map[String, String]): Map[String,
String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet !=
hoodieCatalogTable.partitionFields.toSet)) {
@@ -166,7 +107,7 @@ trait ProvidesHoodieConfig extends Logging {
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
- var combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable,
tableConfig, sparkSession.sqlContext.conf,
+ val combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable,
tableConfig, sparkSession.sqlContext.conf,
defaultOpts = Map.empty, overridingOpts = extraOptions)
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig, extraOptions)
@@ -182,21 +123,45 @@ trait ProvidesHoodieConfig extends Logging {
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
- val enableBulkInsert = combinedOpts.get(SQL_ENABLE_BULK_INSERT.key)
- val dropDuplicate = combinedOpts.get(INSERT_DROP_DUPS.key)
+ val enableBulkInsert =
combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val dropDuplicate = sparkSession.conf
+
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
- val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
- val insertModeSet = insertModeOpt.nonEmpty
- val insertMode =
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
+ val insertMode =
InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
+ DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty &&
hoodieCatalogTable.primaryKeys.nonEmpty
// NOTE: Target operation could be overridden by the user, therefore if it
has been provided as an input
// we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
- val operation = combinedOpts.getOrElse(OPERATION.key,
- deduceWriteOperationForInsertInfo(isPartitionedTable,
isOverwritePartition, isOverwriteTable, insertModeSet, dropDuplicate,
- enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert))
+ val operationOverride =
combinedOpts.get(DataSourceWriteOptions.OPERATION.key)
+ val operation = operationOverride.getOrElse {
+ (enableBulkInsert, isOverwritePartition, isOverwriteTable,
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
+ case (true, _, _, _, false, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert in ${insertMode.value()} mode.")
+ case (true, _, _, true, _, _) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
+ s" Please disable $INSERT_DROP_DUPS and try again.")
+ // Bulk insert with overwrite table
+ case (true, false, true, _, _, _) =>
+ BULK_INSERT_OPERATION_OPT_VAL
+ // Bulk insert with overwrite table partition
+ case (true, true, false, _, _, true) =>
+ BULK_INSERT_OPERATION_OPT_VAL
+ // insert overwrite table
+ case (false, false, true, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ // insert overwrite partition
+ case (false, true, false, _, _, true) =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
+ // disable dropDuplicate, and provide preCombineKey, use the upsert
operation for strict and upsert mode.
+ case (false, false, false, false, false, _) if combineBeforeInsert =>
UPSERT_OPERATION_OPT_VAL
+ // if table is pk table and has enableBulkInsert use bulk insert for
non-strict mode.
+ case (true, false, false, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+ // for the rest case, use the insert operation
+ case _ => INSERT_OPERATION_OPT_VAL
+ }
+ }
val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
@@ -383,10 +348,10 @@ object ProvidesHoodieConfig {
// overridden by any source)s
//
def combineOptions(catalogTable: HoodieCatalogTable,
- tableConfig: HoodieTableConfig,
- sqlConf: SQLConf,
- defaultOpts: Map[String, String],
- overridingOpts: Map[String, String] = Map.empty):
Map[String, String] = {
+ tableConfig: HoodieTableConfig,
+ sqlConf: SQLConf,
+ defaultOpts: Map[String, String],
+ overridingOpts: Map[String, String] = Map.empty):
Map[String, String] = {
// NOTE: Properties are merged in the following order of priority (first
has the highest priority, last has the
// lowest, which is inverse to the ordering in the code):
// 1. (Extra) Option overrides
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index c67b6c40d21..c1a71125927 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal,
NamedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -27,10 +28,9 @@ import
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql._
-import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
/**
* Command for insert into Hudi table.
@@ -101,7 +101,7 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
isOverWritePartition = overwrite
}
- val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, true)
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec,
sparkSession.sessionState.conf)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
index cbc3f65a32b..0259f453039 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
@@ -30,11 +30,6 @@ set hoodie.delete.shuffle.parallelism = 1;
| ok |
+----------+
-set hoodie.sql.insert.mode = upsert;
-+----------+
-| ok |
-+----------+
-
# CTAS
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestInsertIntoOperation.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestInsertIntoOperation.scala
index 5f676d58f8a..e05bea2420b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestInsertIntoOperation.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestInsertIntoOperation.scala
@@ -29,6 +29,8 @@ import scala.collection.JavaConversions._
class TestInsertIntoOperation extends HoodieSparkSqlTestBase {
+ /*
+
/**
* asserts if number of commits = count
* returns true if last commit is bulk insert
@@ -445,4 +447,5 @@ class TestInsertIntoOperation extends
HoodieSparkSqlTestBase {
assert(!assertCommitCountAndIsLastBulkInsert(tableBasePath, 2))
})
}
+ */
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 7521555eea4..d1ffa66edf8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -414,10 +414,8 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
})
assertTrue(totalDeletedFiles > 0)
- // update data
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021",
"10", "02")""")
- }
+ // insert data
+ spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021",
"10", "02")""")
checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
@@ -554,16 +552,12 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
| partitioned by(ts)
| location '$basePath'
| """.stripMargin)
-
- //need to use upsert so we create log files
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- // Create 5 deltacommits to ensure that it is > default
`hoodie.compact.inline.max.delta.commits`
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
- }
+ // Create 5 deltacommits to ensure that it is > default
`hoodie.compact.inline.max.delta.commits`
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, Option(tableName))
// Generate the first compaction plan
@@ -602,17 +596,13 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
| partitioned by(ts)
| location '$basePath'
| """.stripMargin)
-
- //need to use upsert so we create log files
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- // Create 5 deltacommits to ensure that it is > default
`hoodie.compact.inline.max.delta.commits`
- // Write everything into the same FileGroup but into separate blocks
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
- }
+ // Create 5 deltacommits to ensure that it is > default
`hoodie.compact.inline.max.delta.commits`
+ // Write everything into the same FileGroup but into separate blocks
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, Option(tableName))
// Generate the first log_compaction plan
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index 47d7d0cce99..c8fc8b7dfea 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -132,14 +132,12 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
.build()
spark.sql(s"insert into $tableName values (1, 'a1', 11, 1000), (2,
'a2', 12, 1000), (3, 'a3', 13, 1000)")
-
val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
cdcDataOnly1.show(false)
assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)")
- }
+
+ spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)")
val commitTime2 =
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
// here we use `commitTime1` to query the change data in commit 2.
// because `commitTime2` is maybe the ts of the compaction
operation, not the write operation.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 7cfc143be05..74ca5bf6169 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -424,9 +424,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| location '$parentPath/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
- }
+ spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
// drop ro and rt table, and recreate them
val roTableName1 = tableName1 + "_ro"
@@ -492,9 +490,8 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| location '$parentPath/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
- }
+ spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
+
val roTableName1 = tableName1 + "_ro"
checkExceptionContain(
s"""
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
index 6c8f6ec36f8..1b0aa3ff6e1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
@@ -57,14 +57,13 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
Seq(3, "a3", 30.0, 1000)
)
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(
- s"""
- | insert into $tableName
- | values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3,
'a3_3', 30, 1100)
- | """.stripMargin
- )
- }
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3,
'a3_3', 30, 1100)
+ | """.stripMargin
+ )
+
if (tableType == "cow") {
checkAnswer(s"select id, name, price, ts from
hudi_query('$tableName', 'read_optimized')")(
Seq(1, "a1_1", 10.0, 1100),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b51348679e5..54e979486b5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -566,7 +566,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| )
""".stripMargin)
checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")(
- "Table with primaryKey can only use bulk insert in non-strict mode."
+ "Table with primaryKey can not use bulk insert in strict mode."
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala
index 899fe26036a..48ee872d4d9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala
@@ -42,12 +42,9 @@ class TestMergeIntoLogOnlyTable extends
HoodieSparkSqlTestBase {
| hoodie.compact.inline = 'true'
| )
""".stripMargin)
- //need to upsert so we only create log file
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
- }
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
// 3 commits will not trigger compaction, so it should be log only.
assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 16fdce75f36..b1f9347d7e8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -71,71 +71,69 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test alter column types") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- val tableName = generateTableName
- val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
- if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql("set hoodie.schema.on.read.enable=true")
- // NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
- // and are disallowed now by default in Spark 3.x
- spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
- createAndPreparePartitionTable(spark, tableName, tablePath,
tableType)
- // date -> string -> date
- spark.sql(s"alter table $tableName alter column col6 type String")
- checkAnswer(spark.sql(s"select col6 from $tableName where id =
1").collect())(
- Seq("2021-12-25")
- )
- spark.sql(
- s"""
- | insert into $tableName values
- |
(1,1,13.0,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-26','2021-12-25
12:01:01',true,'a01','2021-12-25')
- |""".stripMargin)
- spark.sql(s"alter table $tableName alter column col6 type date")
- checkAnswer(spark.sql(s"select col6 from $tableName where id = 1
or id = 5 order by id").collect())(
- Seq(java.sql.Date.valueOf("2021-12-26")), // value from new file
- Seq(java.sql.Date.valueOf("2021-12-26")) // value from old file
- )
- // int -> double -> decimal
- spark.sql(s"alter table $tableName alter column col0 type double")
- spark.sql(
- s"""
- | insert into $tableName values
- |
(1,1,13.0,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
- |
(6,1,14.0,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25')
- |""".stripMargin)
- spark.sql(s"alter table $tableName alter column col0 type
decimal(16, 4)")
- checkAnswer(spark.sql(s"select col0 from $tableName where id = 1
or id = 6 order by id").collect())(
- Seq(new java.math.BigDecimal("13.0000")),
- Seq(new java.math.BigDecimal("14.0000"))
- )
- // float -> double -> decimal
- spark.sql(s"alter table $tableName alter column col2 type double")
- spark.sql(
- s"""
- | insert into $tableName values
- |
(1,1,13.0,100001,901.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
- |
(6,1,14.0,100001,601.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25')
- |""".stripMargin)
- spark.sql(s"alter table $tableName alter column col2 type
decimal(16, 4)")
- checkAnswer(spark.sql(s"select col0, col2 from $tableName where id
= 1 or id = 6 order by id").collect())(
- Seq(new java.math.BigDecimal("13.0000"), new
java.math.BigDecimal("901.0100")),
- Seq(new java.math.BigDecimal("14.0000"), new
java.math.BigDecimal("601.0100"))
- )
- // long -> double -> decimal
- spark.sql(s"alter table $tableName alter column col1 type double")
- spark.sql(
- s"""
- | insert into $tableName values
- |
(1,1,13.0,700001.0,901.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25')
- |""".stripMargin)
- spark.sql(s"alter table $tableName alter column col1 type
decimal(16, 4)")
- checkAnswer(spark.sql(s"select col0, col2, col1 from $tableName
where id = 1 or id = 6 order by id").collect())(
- Seq(new java.math.BigDecimal("13.0000"), new
java.math.BigDecimal("901.0100"), new java.math.BigDecimal("700001.0000")),
- Seq(new java.math.BigDecimal("14.0000"), new
java.math.BigDecimal("601.0100"), new java.math.BigDecimal("100001.0000"))
- )
- spark.sessionState.catalog.dropTable(TableIdentifier(tableName),
true, true)
- spark.sessionState.catalog.refreshTable(TableIdentifier(tableName))
- }
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ // NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
+ // and are disallowed now by default in Spark 3.x
+ spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
+ createAndPreparePartitionTable(spark, tableName, tablePath,
tableType)
+ // date -> string -> date
+ spark.sql(s"alter table $tableName alter column col6 type String")
+ checkAnswer(spark.sql(s"select col6 from $tableName where id =
1").collect())(
+ Seq("2021-12-25")
+ )
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(1,1,13.0,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-26','2021-12-25
12:01:01',true,'a01','2021-12-25')
+ |""".stripMargin)
+ spark.sql(s"alter table $tableName alter column col6 type date")
+ checkAnswer(spark.sql(s"select col6 from $tableName where id = 1 or
id = 5 order by id").collect())(
+ Seq(java.sql.Date.valueOf("2021-12-26")), // value from new file
+ Seq(java.sql.Date.valueOf("2021-12-26")) // value from old file
+ )
+ // int -> double -> decimal
+ spark.sql(s"alter table $tableName alter column col0 type double")
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(1,1,13.0,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
+ |
(6,1,14.0,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25')
+ |""".stripMargin)
+ spark.sql(s"alter table $tableName alter column col0 type
decimal(16, 4)")
+ checkAnswer(spark.sql(s"select col0 from $tableName where id = 1 or
id = 6 order by id").collect())(
+ Seq(new java.math.BigDecimal("13.0000")),
+ Seq(new java.math.BigDecimal("14.0000"))
+ )
+ // float -> double -> decimal
+ spark.sql(s"alter table $tableName alter column col2 type double")
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(1,1,13.0,100001,901.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
+ |
(6,1,14.0,100001,601.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25')
+ |""".stripMargin)
+ spark.sql(s"alter table $tableName alter column col2 type
decimal(16, 4)")
+ checkAnswer(spark.sql(s"select col0, col2 from $tableName where id =
1 or id = 6 order by id").collect())(
+ Seq(new java.math.BigDecimal("13.0000"), new
java.math.BigDecimal("901.0100")),
+ Seq(new java.math.BigDecimal("14.0000"), new
java.math.BigDecimal("601.0100"))
+ )
+ // long -> double -> decimal
+ spark.sql(s"alter table $tableName alter column col1 type double")
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(1,1,13.0,700001.0,901.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25')
+ |""".stripMargin)
+ spark.sql(s"alter table $tableName alter column col1 type
decimal(16, 4)")
+ checkAnswer(spark.sql(s"select col0, col2, col1 from $tableName
where id = 1 or id = 6 order by id").collect())(
+ Seq(new java.math.BigDecimal("13.0000"), new
java.math.BigDecimal("901.0100"), new java.math.BigDecimal("700001.0000")),
+ Seq(new java.math.BigDecimal("14.0000"), new
java.math.BigDecimal("601.0100"), new java.math.BigDecimal("100001.0000"))
+ )
+ spark.sessionState.catalog.dropTable(TableIdentifier(tableName),
true, true)
+ spark.sessionState.catalog.refreshTable(TableIdentifier(tableName))
}
}
})
@@ -233,102 +231,100 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test alter table properties and add rename drop column") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- val tableName = generateTableName
- val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
- if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql("set hoodie.schema.on.read.enable=true")
- // NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
- // and are disallowed now by default in Spark 3.x
- spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
- createAndPreparePartitionTable(spark, tableName, tablePath,
tableType)
-
- // test set properties
- spark.sql(s"alter table $tableName set tblproperties(comment='it
is a hudi table', 'key1'='value1', 'key2'='value2')")
- val meta =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
- assert(meta.comment.get.equals("it is a hudi table"))
- assert(Seq("key1",
"key2").filter(meta.properties.contains(_)).size == 2)
- // test unset properties
- spark.sql(s"alter table $tableName unset tblproperties(comment,
'key1', 'key2')")
- val unsetMeta =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
- assert(Seq("key1",
"key2").filter(unsetMeta.properties.contains(_)).size == 0)
- assert(unsetMeta.comment.isEmpty)
- // test forbidden operation.
- checkException(s"Alter table $tableName add columns(col_new1 int
first)")("forbid adjust top-level columns position by using through first
syntax")
- HoodieRecord.HOODIE_META_COLUMNS.subList(0,
HoodieRecord.HOODIE_META_COLUMNS.size - 2).asScala.foreach { f =>
- checkException(s"Alter table $tableName add columns(col_new1 int
after $f)")("forbid adjust the position of ordinary columns between meta
columns")
- }
- Seq("id", "comb", "par").foreach { col =>
- checkException(s"alter table $tableName drop column
$col")("cannot support apply changes for primaryKey/CombineKey/partitionKey")
- checkException(s"alter table $tableName rename column $col to
${col + col}")("cannot support apply changes for
primaryKey/CombineKey/partitionKey")
- }
- // check duplicate add or rename
- // keep consistent with hive, column names insensitive
- checkExceptions(s"alter table $tableName rename column col0 to
col9")(Seq("cannot rename column: col0 to a existing name",
- "Cannot rename column, because col9 already exists in root"))
- checkExceptions(s"alter table $tableName rename column col0 to
COL9")(Seq("cannot rename column: col0 to a existing name", "Cannot rename
column, because COL9 already exists in root"))
- checkExceptions(s"alter table $tableName add columns(col9 int
first)")(Seq("cannot add column: col9 which already exist", "Cannot add column,
because col9 already exists in root"))
- checkExceptions(s"alter table $tableName add columns(COL9 int
first)")(Seq("cannot add column: COL9 which already exist", "Cannot add column,
because COL9 already exists in root"))
- // test add comment for columns / alter columns comment
- spark.sql(s"alter table $tableName add columns(col1_new int
comment 'add new columns col1_new after id' after id)")
- spark.sql(s"alter table $tableName alter column col9 comment 'col9
desc'")
- val schema =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).schema
- assert(schema.filter(p =>
p.name.equals("col1_new")).get(0).getComment().get == "add new columns col1_new
after id")
- assert(schema.filter(p =>
p.name.equals("col9")).get(0).getComment().get == "col9 desc")
- // test change column type float to double
- spark.sql(s"alter table $tableName alter column col2 type double")
- spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id").show(false)
- spark.sql(
- s"""
- | insert into $tableName values
- |
(1,3,1,11,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
- |
(6,6,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
- |""".stripMargin)
-
- spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 6 or id = 2 order by id").show(false)
- // try schedule compact
- if (tableType == "mor") spark.sql(s"schedule compaction on
$tableName")
- // test change column type decimal(10,4) 为decimal(18,8)
- spark.sql(s"alter table $tableName alter column col4 type
decimal(18, 8)")
- spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id").show(false)
- spark.sql(
- s"""
- | insert into $tableName values
- |
(5,6,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
- |""".stripMargin)
-
- spark.sql(s"select id, col1_new, col4 from $tableName where id = 1
or id = 6 or id = 2 order by id").show(false)
- // test change column type float to double
- spark.sql(s"alter table $tableName alter column col2 type string")
- spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id").show(false)
- spark.sql(
- s"""
- | insert into $tableName values
- |
(1,3,1,11,100001,'101.01',1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
- |
(6,6,5,15,100005,'105.05',1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
- |""".stripMargin)
-
- spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 6 or id = 2 order by id").show(false)
- // try schedule compact
- if (tableType == "mor") spark.sql(s"schedule compaction on
$tableName")
- // if tableType is mor, check compaction
- if (tableType == "mor") {
- val compactionRows = spark.sql(s"show compaction on $tableName
limit 10").collect()
- val timestamps = compactionRows.map(_.getString(0))
- assertResult(2)(timestamps.length)
- spark.sql(s"run compaction on $tableName at ${timestamps(1)}")
- spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
- }
- spark.sql(
- s"""
- | insert into $tableName values
- |
(1,3,1,11,100001,'101.01',1001.0001,100009.0001,'a000008','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
- |
(11,3,1,11,100001,'101.01',1001.0001,100011.0001,'a000008','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
- |
(6,6,5,15,100005,'105.05',1005.0005,100007.0005,'a000009','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
- |""".stripMargin)
-
- spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 6 or id = 2 or id = 11 order by id").show(false)
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ // NOTE: This is required since as this tests use type coercions
which were only permitted in Spark 2.x
+ // and are disallowed now by default in Spark 3.x
+ spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
+ createAndPreparePartitionTable(spark, tableName, tablePath,
tableType)
+
+ // test set properties
+ spark.sql(s"alter table $tableName set tblproperties(comment='it is
a hudi table', 'key1'='value1', 'key2'='value2')")
+ val meta =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ assert(meta.comment.get.equals("it is a hudi table"))
+ assert(Seq("key1", "key2").filter(meta.properties.contains(_)).size
== 2)
+ // test unset properties
+ spark.sql(s"alter table $tableName unset tblproperties(comment,
'key1', 'key2')")
+ val unsetMeta =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ assert(Seq("key1",
"key2").filter(unsetMeta.properties.contains(_)).size == 0)
+ assert(unsetMeta.comment.isEmpty)
+ // test forbidden operation.
+ checkException(s"Alter table $tableName add columns(col_new1 int
first)")("forbid adjust top-level columns position by using through first
syntax")
+ HoodieRecord.HOODIE_META_COLUMNS.subList(0,
HoodieRecord.HOODIE_META_COLUMNS.size - 2).asScala.foreach {f =>
+ checkException(s"Alter table $tableName add columns(col_new1 int
after $f)")("forbid adjust the position of ordinary columns between meta
columns")
}
+ Seq("id", "comb", "par").foreach { col =>
+ checkException(s"alter table $tableName drop column $col")("cannot
support apply changes for primaryKey/CombineKey/partitionKey")
+ checkException(s"alter table $tableName rename column $col to
${col + col}")("cannot support apply changes for
primaryKey/CombineKey/partitionKey")
+ }
+ // check duplicate add or rename
+ // keep consistent with hive, column names insensitive
+ checkExceptions(s"alter table $tableName rename column col0 to
col9")(Seq("cannot rename column: col0 to a existing name",
+ "Cannot rename column, because col9 already exists in root"))
+ checkExceptions(s"alter table $tableName rename column col0 to
COL9")(Seq("cannot rename column: col0 to a existing name", "Cannot rename
column, because COL9 already exists in root"))
+ checkExceptions(s"alter table $tableName add columns(col9 int
first)")(Seq("cannot add column: col9 which already exist", "Cannot add column,
because col9 already exists in root"))
+ checkExceptions(s"alter table $tableName add columns(COL9 int
first)")(Seq("cannot add column: COL9 which already exist", "Cannot add column,
because COL9 already exists in root"))
+ // test add comment for columns / alter columns comment
+ spark.sql(s"alter table $tableName add columns(col1_new int comment
'add new columns col1_new after id' after id)")
+ spark.sql(s"alter table $tableName alter column col9 comment 'col9
desc'")
+ val schema =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).schema
+ assert(schema.filter(p =>
p.name.equals("col1_new")).get(0).getComment().get == "add new columns col1_new
after id")
+ assert(schema.filter(p =>
p.name.equals("col9")).get(0).getComment().get == "col9 desc")
+ // test change column type float to double
+ spark.sql(s"alter table $tableName alter column col2 type double")
+ spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id").show(false)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(1,3,1,11,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
+ |
(6,6,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
+ |""".stripMargin)
+
+ spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 6 or id = 2 order by id").show(false)
+ // try schedule compact
+ if (tableType == "mor") spark.sql(s"schedule compaction on
$tableName")
+ // test change column type decimal(10,4) 为decimal(18,8)
+ spark.sql(s"alter table $tableName alter column col4 type
decimal(18, 8)")
+ spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id").show(false)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(5,6,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
+ |""".stripMargin)
+
+ spark.sql(s"select id, col1_new, col4 from $tableName where id = 1
or id = 6 or id = 2 order by id").show(false)
+ // test change column type float to double
+ spark.sql(s"alter table $tableName alter column col2 type string")
+ spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id").show(false)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(1,3,1,11,100001,'101.01',1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
+ |
(6,6,5,15,100005,'105.05',1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
+ |""".stripMargin)
+
+ spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 6 or id = 2 order by id").show(false)
+ // try schedule compact
+ if (tableType == "mor") spark.sql(s"schedule compaction on
$tableName")
+ // if tableType is mor, check compaction
+ if (tableType == "mor") {
+ val compactionRows = spark.sql(s"show compaction on $tableName
limit 10").collect()
+ val timestamps = compactionRows.map(_.getString(0))
+ assertResult(2)(timestamps.length)
+ spark.sql(s"run compaction on $tableName at ${timestamps(1)}")
+ spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
+ }
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ |
(1,3,1,11,100001,'101.01',1001.0001,100009.0001,'a000008','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
+ |
(11,3,1,11,100001,'101.01',1001.0001,100011.0001,'a000008','2021-12-25','2021-12-25
12:01:01',true,'a01','2021-12-25'),
+ |
(6,6,5,15,100005,'105.05',1005.0005,100007.0005,'a000009','2021-12-26','2021-12-26
12:05:05',false,'a05','2021-12-26')
+ |""".stripMargin)
+
+ spark.sql(s"select id, col1_new, col2 from $tableName where id = 1
or id = 6 or id = 2 or id = 11 order by id").show(false)
}
}
})
@@ -337,50 +333,48 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test Chinese table ") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- val tableName = generateTableName
- val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
- if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql(
- s"""
- |create table $tableName (
- | id int, comb int, `名字` string, col9 string, `成绩` int, `身高`
float, `体重` double, `上次更新时间` date, par date
- |) using hudi
- | location '$tablePath'
- | options (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'comb'
- | )
- | partitioned by (par)
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int, comb int, `名字` string, col9 string, `成绩` int, `身高`
float, `体重` double, `上次更新时间` date, par date
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'comb'
+ | )
+ | partitioned by (par)
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName values
- | (1,3,'李明', '读书', 100,180.0001,99.0001,'2021-12-25',
'2021-12-26')
- |""".stripMargin)
- spark.sql(s"alter table $tableName rename column col9 to
`爱好_Best`")
-
- // update current table to produce log files for mor
- spark.sql(
- s"""
- | insert into $tableName values
- | (1,3,'李明', '读书', 100,180.0001,99.0001,'2021-12-26',
'2021-12-26')
- |""".stripMargin)
-
- // alter date to string
- spark.sql(s"alter table $tableName alter column `上次更新时间` type
string ")
- checkAnswer(spark.sql(s"select `上次更新时间` from
$tableName").collect())(
- Seq("2021-12-26")
- )
- // alter string to date
- spark.sql(s"alter table $tableName alter column `上次更新时间` type date
")
- spark.sql(s"select `上次更新时间` from $tableName").collect()
- checkAnswer(spark.sql(s"select `上次更新时间` from
$tableName").collect())(
- Seq(java.sql.Date.valueOf("2021-12-26"))
- )
- }
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1,3,'李明', '读书', 100,180.0001,99.0001,'2021-12-25',
'2021-12-26')
+ |""".stripMargin)
+ spark.sql(s"alter table $tableName rename column col9 to `爱好_Best`")
+
+ // update current table to produce log files for mor
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1,3,'李明', '读书', 100,180.0001,99.0001,'2021-12-26',
'2021-12-26')
+ |""".stripMargin)
+
+ // alter date to string
+ spark.sql(s"alter table $tableName alter column `上次更新时间` type string
")
+ checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
+ Seq("2021-12-26")
+ )
+ // alter string to date
+ spark.sql(s"alter table $tableName alter column `上次更新时间` type date ")
+ spark.sql(s"select `上次更新时间` from $tableName").collect()
+ checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
+ Seq(java.sql.Date.valueOf("2021-12-26"))
+ )
}
}
})
@@ -518,88 +512,86 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test alter column with complex schema") {
withRecordType()(withTempDir { tmp =>
Seq("mor").foreach { tableType =>
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- val tableName = generateTableName
- val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
- if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | members map<String, struct<n:string, a:int>>,
- | user struct<name:string, age:int, score: int>,
- | ts long
- |) using hudi
- | location '$tablePath'
- | options (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | members map<String, struct<n:string, a:int>>,
+ | user struct<name:string, age:int, score: int>,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(s"alter table $tableName alter column members.value.a
first")
-
- spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStruct', 29, 100),
1000)")
-
- // rename column
- spark.sql(s"alter table ${tableName} rename column user to userx")
-
- checkAnswer(spark.sql(s"select ts, userx.score, id, userx.age,
name from ${tableName}").collect())(
- Seq(1000, 100, 1, 29, "jack")
- )
-
- // drop column
- spark.sql(s"alter table ${tableName} drop columns(name,
userx.name, userx.score)")
-
- spark.sql(s"select * from ${tableName}").show(false)
-
- // add cols back, and adjust cols position
- spark.sql(s"alter table ${tableName} add columns(name string
comment 'add name back' after userx," +
- s" userx.name string comment 'add userx.name back' first,
userx.score int comment 'add userx.score back' after age)")
-
- // query new columns: name, userx.name, userx.score, those field
should not be read.
- checkAnswer(spark.sql(s"select name, userx.name, userx.score from
${tableName}").collect())(Seq(null, null, null))
-
- // insert again
- spark.sql(s"insert into ${tableName} values(2 , map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 ,
101), 'jacknew', 1000)")
-
- // check again
- checkAnswer(spark.sql(s"select name, userx.name as uxname,
userx.score as uxs from ${tableName} order by id").collect())(
- Seq(null, null, null),
- Seq("jacknew", "jackStructNew", 101))
-
-
- spark.sql(s"alter table ${tableName} alter column userx.age type
long")
-
- spark.sql(s"select userx.age, id, name from ${tableName}")
- checkAnswer(spark.sql(s"select userx.age, id, name from
${tableName} order by id").collect())(
- Seq(29, 1, null),
- Seq(291, 2, "jacknew"))
- // test map value type change
- spark.sql(s"alter table ${tableName} add columns(mxp map<String,
int>)")
- spark.sql(s"insert into ${tableName} values(2, map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 ,
101), 'jacknew', 1000, map('t1', 9))")
- spark.sql(s"alter table ${tableName} alter column mxp.value type
double")
- spark.sql(s"insert into ${tableName} values(2, map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 ,
101), 'jacknew', 1000, map('t1', 10))")
- spark.sql(s"select * from $tableName").show(false)
- checkAnswer(spark.sql(s"select mxp from ${tableName} order by
id").collect())(
- Seq(null),
- Seq(Map("t1" -> 10.0d))
- )
- spark.sql(s"alter table ${tableName} rename column members to mem")
- spark.sql(s"alter table ${tableName} rename column mem.value.n to
nn")
- spark.sql(s"alter table ${tableName} rename column userx to us")
- spark.sql(s"alter table ${tableName} rename column us.age to age1")
-
- spark.sql(s"insert into ${tableName} values(2, map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 ,
101), 'jacknew', 1000, map('t1', 10))")
- spark.sql(s"select mem.value.nn, us.age1 from $tableName order by
id").show()
- checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from
$tableName order by id").collect())(
- Seq(null, 29),
- Seq(null, 291)
- )
- }
+ spark.sql(s"alter table $tableName alter column members.value.a
first")
+
+ spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStruct', 29, 100),
1000)")
+
+ // rename column
+ spark.sql(s"alter table ${tableName} rename column user to userx")
+
+ checkAnswer(spark.sql(s"select ts, userx.score, id, userx.age, name
from ${tableName}").collect())(
+ Seq(1000, 100, 1, 29, "jack")
+ )
+
+ // drop column
+ spark.sql(s"alter table ${tableName} drop columns(name, userx.name,
userx.score)")
+
+ spark.sql(s"select * from ${tableName}").show(false)
+
+ // add cols back, and adjust cols position
+ spark.sql(s"alter table ${tableName} add columns(name string comment
'add name back' after userx," +
+ s" userx.name string comment 'add userx.name back' first,
userx.score int comment 'add userx.score back' after age)")
+
+ // query new columns: name, userx.name, userx.score, those field
should not be read.
+ checkAnswer(spark.sql(s"select name, userx.name, userx.score from
${tableName}").collect())(Seq(null, null, null))
+
+ // insert again
+ spark.sql(s"insert into ${tableName} values(2 , map('k1',
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 ,
101), 'jacknew', 1000)")
+
+ // check again
+ checkAnswer(spark.sql(s"select name, userx.name as uxname,
userx.score as uxs from ${tableName} order by id").collect())(
+ Seq(null, null, null),
+ Seq("jacknew", "jackStructNew", 101))
+
+
+ spark.sql(s"alter table ${tableName} alter column userx.age type
long")
+
+ spark.sql(s"select userx.age, id, name from ${tableName}")
+ checkAnswer(spark.sql(s"select userx.age, id, name from ${tableName}
order by id").collect())(
+ Seq(29, 1, null),
+ Seq(291, 2, "jacknew"))
+ // test map value type change
+ spark.sql(s"alter table ${tableName} add columns(mxp map<String,
int>)")
+ spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100,
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew',
1000, map('t1', 9))")
+ spark.sql(s"alter table ${tableName} alter column mxp.value type
double")
+ spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100,
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew',
1000, map('t1', 10))")
+ spark.sql(s"select * from $tableName").show(false)
+ checkAnswer(spark.sql(s"select mxp from ${tableName} order by
id").collect())(
+ Seq(null),
+ Seq(Map("t1" -> 10.0d))
+ )
+ spark.sql(s"alter table ${tableName} rename column members to mem")
+ spark.sql(s"alter table ${tableName} rename column mem.value.n to
nn")
+ spark.sql(s"alter table ${tableName} rename column userx to us")
+ spark.sql(s"alter table ${tableName} rename column us.age to age1")
+
+ spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100,
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew',
1000, map('t1', 10))")
+ spark.sql(s"select mem.value.nn, us.age1 from $tableName order by
id").show()
+ checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName
order by id").collect())(
+ Seq(null, 29),
+ Seq(null, 291)
+ )
}
}
})
@@ -825,37 +817,35 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test FLOAT to DECIMAL schema evolution (lost in scale)") {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- // Using INMEMORY index for mor table so that log files will be
created instead of parquet
- val tableName = generateTableName
- if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price float,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey = 'id',
- | type = '$tableType',
- | preCombineField = 'ts'
- | ${if (tableType.equals("mor")) ", hoodie.index.type =
'INMEMORY'" else ""}
- | )
+ // Using INMEMORY index for mor table so that log files will be
created instead of parquet
+ val tableName = generateTableName
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price float,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | ${if (tableType.equals("mor")) ", hoodie.index.type =
'INMEMORY'" else ""}
+ | )
""".stripMargin)
- spark.sql(s"insert into $tableName values (1, 'a1', 10.024, 1000)")
+ spark.sql(s"insert into $tableName values (1, 'a1', 10.024, 1000)")
-
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
+
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql(s"alter table $tableName alter column price type
decimal(4, 2)")
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ spark.sql(s"alter table $tableName alter column price type
decimal(4, 2)")
- // Not checking answer as this is an unsafe casting operation,
just need to make sure that error is not thrown
- spark.sql(s"select id, name, cast(price as string), ts from
$tableName")
- }
+ // Not checking answer as this is an unsafe casting operation, just
need to make sure that error is not thrown
+ spark.sql(s"select id, name, cast(price as string), ts from
$tableName")
}
}
}
@@ -864,65 +854,63 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test DOUBLE to DECIMAL schema evolution (lost in scale)") {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- // Using INMEMORY index for mor table so that log files will be
created instead of parquet
- val tableName = generateTableName
- if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey = 'id',
- | type = '$tableType',
- | preCombineField = 'ts'
- | ${if (tableType.equals("mor")) ", hoodie.index.type =
'INMEMORY'" else ""}
- | )
+ // Using INMEMORY index for mor table so that log files will be
created instead of parquet
+ val tableName = generateTableName
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | ${if (tableType.equals("mor")) ", hoodie.index.type =
'INMEMORY'" else ""}
+ | )
""".stripMargin)
- spark.sql(s"insert into $tableName values " +
- // testing the rounding behaviour to ensure that HALF_UP is used
for positive values
- "(1, 'a1', 10.024, 1000)," +
- "(2, 'a2', 10.025, 1000)," +
- "(3, 'a3', 10.026, 1000)," +
- // testing the rounding behaviour to ensure that HALF_UP is used
for negative values
- "(4, 'a4', -10.024, 1000)," +
- "(5, 'a5', -10.025, 1000)," +
- "(6, 'a6', -10.026, 1000)," +
- // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
- "(7, 'a7', 10.034, 1000)," +
- "(8, 'a8', 10.035, 1000)," +
- "(9, 'a9', 10.036, 1000)," +
- // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
- "(10, 'a10', -10.034, 1000)," +
- "(11, 'a11', -10.035, 1000)," +
- "(12, 'a12', -10.036, 1000)")
-
-
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
-
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql(s"alter table $tableName alter column price type
decimal(4, 2)")
-
- checkAnswer(s"select id, name, cast(price as string), ts from
$tableName order by id")(
- Seq(1, "a1", "10.02", 1000),
- Seq(2, "a2", "10.03", 1000),
- Seq(3, "a3", "10.03", 1000),
- Seq(4, "a4", "-10.02", 1000),
- Seq(5, "a5", "-10.03", 1000),
- Seq(6, "a6", "-10.03", 1000),
- Seq(7, "a7", "10.03", 1000),
- Seq(8, "a8", "10.04", 1000),
- Seq(9, "a9", "10.04", 1000),
- Seq(10, "a10", "-10.03", 1000),
- Seq(11, "a11", "-10.04", 1000),
- Seq(12, "a12", "-10.04", 1000)
- )
- }
+ spark.sql(s"insert into $tableName values " +
+ // testing the rounding behaviour to ensure that HALF_UP is used
for positive values
+ "(1, 'a1', 10.024, 1000)," +
+ "(2, 'a2', 10.025, 1000)," +
+ "(3, 'a3', 10.026, 1000)," +
+ // testing the rounding behaviour to ensure that HALF_UP is used
for negative values
+ "(4, 'a4', -10.024, 1000)," +
+ "(5, 'a5', -10.025, 1000)," +
+ "(6, 'a6', -10.026, 1000)," +
+ // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
+ "(7, 'a7', 10.034, 1000)," +
+ "(8, 'a8', 10.035, 1000)," +
+ "(9, 'a9', 10.036, 1000)," +
+ // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
+ "(10, 'a10', -10.034, 1000)," +
+ "(11, 'a11', -10.035, 1000)," +
+ "(12, 'a12', -10.036, 1000)")
+
+
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
+
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ spark.sql(s"alter table $tableName alter column price type
decimal(4, 2)")
+
+ checkAnswer(s"select id, name, cast(price as string), ts from
$tableName order by id")(
+ Seq(1, "a1", "10.02", 1000),
+ Seq(2, "a2", "10.03", 1000),
+ Seq(3, "a3", "10.03", 1000),
+ Seq(4, "a4", "-10.02", 1000),
+ Seq(5, "a5", "-10.03", 1000),
+ Seq(6, "a6", "-10.03", 1000),
+ Seq(7, "a7", "10.03", 1000),
+ Seq(8, "a8", "10.04", 1000),
+ Seq(9, "a9", "10.04", 1000),
+ Seq(10, "a10", "-10.03", 1000),
+ Seq(11, "a11", "-10.04", 1000),
+ Seq(12, "a12", "-10.04", 1000)
+ )
}
}
}
@@ -931,65 +919,63 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test STRING to DECIMAL schema evolution (lost in scale)") {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- // Using INMEMORY index for mor table so that log files will be
created instead of parquet
- val tableName = generateTableName
- if (HoodieSparkUtils.gteqSpark3_1) {
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price string,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey = 'id',
- | type = '$tableType',
- | preCombineField = 'ts'
- | ${if (tableType.equals("mor")) ", hoodie.index.type =
'INMEMORY'" else ""}
- | )
+ // Using INMEMORY index for mor table so that log files will be
created instead of parquet
+ val tableName = generateTableName
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price string,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | ${if (tableType.equals("mor")) ", hoodie.index.type =
'INMEMORY'" else ""}
+ | )
""".stripMargin)
- spark.sql(s"insert into $tableName values " +
- // testing the rounding behaviour to ensure that HALF_UP is used
for positive values
- "(1, 'a1', '10.024', 1000)," +
- "(2, 'a2', '10.025', 1000)," +
- "(3, 'a3', '10.026', 1000)," +
- // testing the rounding behaviour to ensure that HALF_UP is used
for negative values
- "(4, 'a4', '-10.024', 1000)," +
- "(5, 'a5', '-10.025', 1000)," +
- "(6, 'a6', '-10.026', 1000)," +
- // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
- "(7, 'a7', '10.034', 1000)," +
- "(8, 'a8', '10.035', 1000)," +
- "(9, 'a9', '10.036', 1000)," +
- // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
- "(10, 'a10', '-10.034', 1000)," +
- "(11, 'a11', '-10.035', 1000)," +
- "(12, 'a12', '-10.036', 1000)")
-
-
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
-
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql(s"alter table $tableName alter column price type
decimal(4, 2)")
-
- checkAnswer(s"select id, name, cast(price as string), ts from
$tableName order by id")(
- Seq(1, "a1", "10.02", 1000),
- Seq(2, "a2", "10.03", 1000),
- Seq(3, "a3", "10.03", 1000),
- Seq(4, "a4", "-10.02", 1000),
- Seq(5, "a5", "-10.03", 1000),
- Seq(6, "a6", "-10.03", 1000),
- Seq(7, "a7", "10.03", 1000),
- Seq(8, "a8", "10.04", 1000),
- Seq(9, "a9", "10.04", 1000),
- Seq(10, "a10", "-10.03", 1000),
- Seq(11, "a11", "-10.04", 1000),
- Seq(12, "a12", "-10.04", 1000)
- )
- }
+ spark.sql(s"insert into $tableName values " +
+ // testing the rounding behaviour to ensure that HALF_UP is used
for positive values
+ "(1, 'a1', '10.024', 1000)," +
+ "(2, 'a2', '10.025', 1000)," +
+ "(3, 'a3', '10.026', 1000)," +
+ // testing the rounding behaviour to ensure that HALF_UP is used
for negative values
+ "(4, 'a4', '-10.024', 1000)," +
+ "(5, 'a5', '-10.025', 1000)," +
+ "(6, 'a6', '-10.026', 1000)," +
+ // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
+ "(7, 'a7', '10.034', 1000)," +
+ "(8, 'a8', '10.035', 1000)," +
+ "(9, 'a9', '10.036', 1000)," +
+ // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
+ "(10, 'a10', '-10.034', 1000)," +
+ "(11, 'a11', '-10.035', 1000)," +
+ "(12, 'a12', '-10.036', 1000)")
+
+
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
+
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ spark.sql(s"alter table $tableName alter column price type
decimal(4, 2)")
+
+ checkAnswer(s"select id, name, cast(price as string), ts from
$tableName order by id")(
+ Seq(1, "a1", "10.02", 1000),
+ Seq(2, "a2", "10.03", 1000),
+ Seq(3, "a3", "10.03", 1000),
+ Seq(4, "a4", "-10.02", 1000),
+ Seq(5, "a5", "-10.03", 1000),
+ Seq(6, "a6", "-10.03", 1000),
+ Seq(7, "a7", "10.03", 1000),
+ Seq(8, "a8", "10.04", 1000),
+ Seq(9, "a9", "10.04", 1000),
+ Seq(10, "a10", "-10.03", 1000),
+ Seq(11, "a11", "-10.04", 1000),
+ Seq(12, "a12", "-10.04", 1000)
+ )
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
index 22ea2d09b57..ed67b0caf9a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
@@ -40,7 +40,9 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
| )
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
+
spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+
val metaClient1 = HoodieTableMetaClient.builder()
.setBasePath(s"${tmp.getCanonicalPath}/$tableName1")
.setConf(spark.sessionState.newHadoopConf())
@@ -48,9 +50,8 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
.lastInstant().get().getTimestamp
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")
- }
+
+ spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")
checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a2", 20.0, 2000)
@@ -279,9 +280,8 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
val instant = metaClient.getActiveTimeline.getAllCommitsTimeline
.lastInstant().get().getTimestamp
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)")
- }
+ spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)")
+
checkAnswer(s"select id, name, price, ts from $tableName distribute by
cast(rand() * 2 as int)")(
Seq(1, "a2", 20.0, 2000)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
index 8585a8f8fe1..aac2a4027a2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
@@ -76,20 +76,14 @@ class TestHoodiePruneFileSourcePartitions extends
HoodieClientTestBase with Scal
|)
|LOCATION '$basePath/$tableName'
""".stripMargin)
- try {
- //needs to be upsert because bulk insert is creating 3 files in
nonpartitioned, even when bulk insert parallelism
- //is 1 and df is repartitioned
- spark.conf.set("hoodie.sql.insert.mode", "upsert")
- spark.sql(
- s"""
- |INSERT INTO $tableName VALUES
- | (1, 'a1', 10, 1000, "2021-01-05"),
- | (2, 'a2', 20, 2000, "2021-01-06"),
- | (3, 'a3', 30, 3000, "2021-01-07")
- """.stripMargin)
- } finally {
- spark.conf.unset("hoodie.sql.insert.mode")
- }
+
+ spark.sql(
+ s"""
+ |INSERT INTO $tableName VALUES
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
Seq("eager", "lazy").foreach { listingModeOverride =>
// We need to refresh the table to make sure Spark is re-processing the
query every time
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
index e097c6f018e..e90a10e9f9a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
@@ -141,13 +141,10 @@ class TestCleanProcedure extends
HoodieSparkProcedureTestBase {
| )
|""".stripMargin)
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- //needs upsert because each commit will create a new filegroup so no
cleaning will happen
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
- }
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
val result1 = spark.sql(s"call run_clean(table => '$tableName')")
.collect()
@@ -181,13 +178,11 @@ class TestCleanProcedure extends
HoodieSparkProcedureTestBase {
| preCombineField = 'ts'
| )
|""".stripMargin)
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- //needs upsert because each commit will create a new filegroup so no
cleaning will happen
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
- }
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
val result1 = spark.sql(
s"""call run_clean(table => '$tableName', options => "
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 96880d6680d..7b8dc8f8a90 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -635,13 +635,9 @@ class TestClusteringProcedure extends
HoodieSparkProcedureTestBase {
// Test clustering with PARTITION_SELECTED config set, choose only a
part of all partitions to schedule
{
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- //use upsert so records are in same filegroup when in same partition
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
- }
-
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
// Do
val result = spark.sql(s"call run_clustering(table => '$tableName', " +
s"selected_partitions => 'ts=1010', show_involved_partition =>
true)")
@@ -659,12 +655,9 @@ class TestClusteringProcedure extends
HoodieSparkProcedureTestBase {
// Test clustering with PARTITION_SELECTED, choose all partitions to
schedule
{
- withSQLConf("hoodie.sql.insert.mode"-> "upsert") {
- //use upsert so records are in same filegroup when in same partition
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
- spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
- }
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
val result = spark.sql(s"call run_clustering(table => '$tableName', " +
s"selected_partitions => 'ts=1010,ts=1011,ts=1012',
show_involved_partition => true)")
.collect()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
index 9a5d186d83a..80ce82ca88e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
@@ -42,16 +42,13 @@ class TestCommitsProcedure extends
HoodieSparkProcedureTestBase {
""".stripMargin)
// insert data to table, will generate 5 active commits and 2 archived
commits
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- //use upsert so records are in same filegroup
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
- spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
- spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
- spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
- spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000")
- spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500")
- spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000")
- }
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+ spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
+ spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000")
+ spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500")
+ spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000")
// Check required fields
checkExceptionContain(s"""call show_archived_commits(limit => 10)""")(
@@ -95,16 +92,13 @@ class TestCommitsProcedure extends
HoodieSparkProcedureTestBase {
""".stripMargin)
// insert data to table, will generate 5 active commits and 2 archived
commits
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- //use upsert so records are in same filegroup
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
- spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
- spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
- spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
- spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000")
- spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500")
- spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000")
- }
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+ spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
+ spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000")
+ spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500")
+ spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000")
// Check required fields
checkExceptionContain(s"""call show_archived_commits_metadata(limit =>
10)""")(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 930ba7d335a..6a6e74e8b72 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -196,11 +196,13 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)")
- spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)")
- spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)")
- }
+
+ spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)")
+
assertResult(2)(spark.sql(s"call show_compaction(path =>
'${tmp.getCanonicalPath}/$tableName1')").collect().length)
}
}
@@ -224,11 +226,10 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
| )
| location '${tmp.getCanonicalPath}'
""".stripMargin)
+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName values(1, 'a2', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a3', 10, 1000)")
- }
+ spark.sql(s"insert into $tableName values(1, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a3', 10, 1000)")
val result1 = spark.sql(
s"""call run_compaction(table => '$tableName', op => 'run', options
=> "
@@ -238,9 +239,7 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
.collect()
assertResult(0)(result1.length)
- withSQLConf("hoodie.sql.insert.mode" -> "upsert") {
- spark.sql(s"insert into $tableName values(1, 'a4', 10, 1000)")
- }
+ spark.sql(s"insert into $tableName values(1, 'a4', 10, 1000)")
val result2 = spark.sql(
s"""call run_compaction(table => '$tableName', op => 'run', options
=> "
|
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy,