This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4f44eeda5bc [HUDI-8018] Improve Spark SQL tests (#11859)
4f44eeda5bc is described below
commit 4f44eeda5bcbd6a765c8bcf30352c47b47af04d4
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Sep 10 11:04:34 2024 +0530
[HUDI-8018] Improve Spark SQL tests (#11859)
---
.../TestAutoGenerationOfRecordKeys.scala | 16 +-
.../hudi/functional/TestMetadataRecordIndex.scala | 11 +-
.../functional/TestParquetColumnProjection.scala | 3 +-
.../TestSparkSqlWithTimestampKeyGenerator.scala | 30 +-
.../sql/hudi/ddl/TestAlterTableDropPartition.scala | 72 ++--
.../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 219 +++++------
.../spark/sql/hudi/dml/TestCDCForSparkSQL.scala | 74 ++--
.../spark/sql/hudi/dml/TestCompactionTable.scala | 70 ++++
.../spark/sql/hudi/dml/TestDataSkippingQuery.scala | 277 +++++++-------
.../spark/sql/hudi/dml/TestInsertTable.scala | 424 +++++++++++----------
.../spark/sql/hudi/dml/TestMergeIntoTable.scala | 6 +-
.../spark/sql/hudi/dml/TestTimeTravelTable.scala | 246 ++++++------
.../spark/sql/hudi/dml/TestUpdateTable.scala | 63 ++-
13 files changed, 814 insertions(+), 697 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
index 247454a0626..5c35c28c007 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession,
SparkSessionExtensions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
import java.util.function.Consumer
@@ -210,7 +210,7 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
var options: Map[String, String] = vanillaWriteOpts ++ Map(
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[SimpleKeyGenerator].getCanonicalName)
- // NOTE: In this test we deliberately removing record-key configuration
+ // NOTE: In this test we are deliberately removing record-key configuration
// to validate Hudi is handling this case appropriately
var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
@@ -264,14 +264,16 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
assertEquals(5, snapshot0.count())
}
- @Test
- def testUpsertsAndDeletesWithPkLess(): Unit = {
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testUpsertsAndDeletesWithPkLess(tableType: HoodieTableType): Unit = {
val (vanillaWriteOpts, readOpts) =
getWriterReaderOpts(HoodieRecordType.AVRO)
- var options: Map[String, String] = vanillaWriteOpts ++ Map(
- DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[SimpleKeyGenerator].getCanonicalName)
+ val options: Map[String, String] = vanillaWriteOpts ++ Map(
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[SimpleKeyGenerator].getCanonicalName,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
- var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+ val writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000",
20)).asScala.toList
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
index 4253f425a58..74686933921 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -28,13 +28,13 @@ import org.apache.hudi.common.util.Option
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
-
import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
import java.util.concurrent.atomic.AtomicInteger
-
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -78,10 +78,11 @@ class TestMetadataRecordIndex extends
HoodieSparkClientTestBase {
cleanupSparkContexts()
}
- @Test
- def testClusteringWithRecordIndex(): Unit = {
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testClusteringWithRecordIndex(tableType: HoodieTableType): Unit = {
val hudiOpts = commonOpts ++ Map(
- TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
+ TABLE_TYPE.key -> tableType.name(),
HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2"
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 8e48bc1a070..b62cf1b9f93 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertTrue, fail}
-import org.junit.jupiter.api.{Disabled, Tag, Test}
+import org.junit.jupiter.api.{Tag, Test}
import scala.collection.JavaConverters._
import scala.math.abs
@@ -60,7 +60,6 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
override def conf: SparkConf = conf(getSparkSqlConf)
- @Disabled("Currently disabled b/c of the fallback to HadoopFsRelation")
@Test
def testBaseFileOnlyViewRelation(): Unit = {
val tablePath = s"$basePath/cow"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
index eae33bb860d..e7b20318a00 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -44,7 +44,7 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
timestampKeyGeneratorSettings.foreach { keyGeneratorSettings =>
withTable(generateTableName) { tableName =>
// Warning level is used due to CI run with warn-log profile for
quick failed cases identification
- LOG.warn(s"Table '${tableName}' with parameters: ${testParams}.
Timestamp key generator settings: ${keyGeneratorSettings}")
+ LOG.warn(s"Table '$tableName' with parameters: $testParams.
Timestamp key generator settings: $keyGeneratorSettings")
val tablePath = tmp.getCanonicalPath + "/" + tableName
val tsType = if (keyGeneratorSettings.contains("DATE_STRING"))
"string" else "long"
spark.sql(
@@ -53,19 +53,19 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| precomb long,
- | ts ${tsType}
+ | ts $tsType
| ) USING HUDI
| PARTITIONED BY (ts)
- | LOCATION '${tablePath}'
+ | LOCATION '$tablePath'
| TBLPROPERTIES (
- | type = '${tableType}',
+ | type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'precomb',
| hoodie.datasource.write.partitionpath.field = 'ts',
| hoodie.datasource.write.hive_style_partitioning = 'false',
- | hoodie.file.group.reader.enabled =
'${shouldUseFileGroupReader}',
+ | hoodie.file.group.reader.enabled =
'$shouldUseFileGroupReader',
| hoodie.table.keygenerator.class =
'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
- | ${keyGeneratorSettings}
+ | $keyGeneratorSettings
| )
|""".stripMargin)
// TODO: couldn't set `TIMESTAMP` for
`hoodie.table.keygenerator.type`, it's overwritten by `SIMPLE`, only
`hoodie.table.keygenerator.class` works
@@ -77,14 +77,14 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
else // UNIX_TIMESTAMP, and SCALAR with SECONDS
(dataBatchesWithLongOfSeconds, queryResultWithLongOfSeconds)
- withSQLConf("hoodie.file.group.reader.enabled" ->
s"${shouldUseFileGroupReader}",
+ withSQLConf("hoodie.file.group.reader.enabled" ->
s"$shouldUseFileGroupReader",
"hoodie.datasource.query.type" -> "snapshot") {
// two partitions, one contains parquet file only, the second
one contains parquet and log files for MOR, and two parquets for COW
- spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(0)}")
- spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(1)}")
+ spark.sql(s"INSERT INTO $tableName VALUES ${dataBatches(0)}")
+ spark.sql(s"INSERT INTO $tableName VALUES ${dataBatches(1)}")
- val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM
${tableName} ORDER BY id").collect().mkString("; ")
- LOG.warn(s"Query result: ${queryResult}")
+ val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM
$tableName ORDER BY id").collect().mkString("; ")
+ LOG.warn(s"Query result: $queryResult")
// TODO: use `shouldExtractPartitionValuesFromPartitionPath`
uniformly, and get `expectedQueryResult` for all cases instead of
`expectedQueryResultWithLossyString` for some cases
// After it we could properly process filters like "WHERE ts
BETWEEN 1078016000 and 1718953003" and add tests with partition pruning.
// COW: Fix for [HUDI-3896] overwrites
`shouldExtractPartitionValuesFromPartitionPath` in `BaseFileOnlyRelation`,
therefore for COW we extracting from partition paths and get nulls
@@ -134,18 +134,18 @@ object TestSparkSqlWithTimestampKeyGenerator {
val timestampKeyGeneratorSettings: Array[String] = Array(
s"""
| hoodie.keygen.timebased.timestamp.type = 'UNIX_TIMESTAMP',
- | hoodie.keygen.timebased.output.dateformat =
'${outputDateformat}'""",
+ | hoodie.keygen.timebased.output.dateformat = '$outputDateformat'""",
s"""
| hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
- | hoodie.keygen.timebased.output.dateformat =
'${outputDateformat}'""",
+ | hoodie.keygen.timebased.output.dateformat = '$outputDateformat'""",
s"""
| hoodie.keygen.timebased.timestamp.type = 'SCALAR',
| hoodie.keygen.timebased.timestamp.scalar.time.unit = 'SECONDS',
- | hoodie.keygen.timebased.output.dateformat =
'${outputDateformat}'""",
+ | hoodie.keygen.timebased.output.dateformat = '$outputDateformat'""",
s"""
| hoodie.keygen.timebased.timestamp.type = 'DATE_STRING',
| hoodie.keygen.timebased.input.dateformat = 'yyyy-MM-dd HH:mm:ss',
- | hoodie.keygen.timebased.output.dateformat = '${outputDateformat}'"""
+ | hoodie.keygen.timebased.output.dateformat = '$outputDateformat'"""
)
// All data batches should correspond to 2004-02-29 01:02:03 and 2024-06-21
06:50:03
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
index 3fcbea3a05c..5f8f2673bc1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
@@ -520,42 +520,44 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
}
test("Prevent a partition from being dropped if there are pending CLUSTERING
jobs") {
- withTempDir { tmp =>
- val tableName = generateTableName
- val basePath = s"${tmp.getCanonicalPath}t/$tableName"
- val schemaFields = Seq("id", "name", "price", "ts")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | options (
- | primaryKey ='id',
- | type = 'cow',
- | preCombineField = 'ts'
- | )
- | partitioned by(ts)
- | location '$basePath'
- | """.stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
- val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, Option(tableName))
-
- // Generate the first clustering plan
- val firstScheduleInstant = client.createNewInstantTime()
- client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
-
- checkAnswer(s"call show_clustering('$tableName')")(
- Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
- )
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+ val schemaFields = Seq("id", "name", "price", "ts")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ | """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, Option(tableName))
+
+ // Generate the first clustering plan
+ val firstScheduleInstant = client.createNewInstantTime()
+ client.scheduleClusteringAtInstant(firstScheduleInstant,
HOption.empty())
+
+ checkAnswer(s"call show_clustering('$tableName')")(
+ Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(),
"*")
+ )
- val partition = "ts=1002"
- val errMsg = s"Failed to drop partitions. Please ensure that there are
no pending table service actions (clustering/compaction) for the partitions to
be deleted: [$partition]"
- checkExceptionContain(s"ALTER TABLE $tableName DROP
PARTITION($partition)")(errMsg)
+ val partition = "ts=1002"
+ val errMsg = s"Failed to drop partitions. Please ensure that there are
no pending table service actions (clustering/compaction) for the partitions to
be deleted: [$partition]"
+ checkExceptionContain(s"ALTER TABLE $tableName DROP
PARTITION($partition)")(errMsg)
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 385ba06ba5e..fb39e8eaef7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -41,7 +41,7 @@ import scala.collection.JavaConverters._
class TestSpark3DDL extends HoodieSparkSqlTestBase {
def createTestResult(tableName: String): Array[Row] = {
- spark.sql(s"select * from ${tableName} order by id")
+ spark.sql(s"select * from $tableName order by id")
.drop("_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
}
@@ -184,53 +184,55 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
test("Test Enable and Disable Schema on read") {
- withTempDir { tmp =>
- val tableName = generateTableName
- val tablePath = s"${tmp.getCanonicalPath}/$tableName"
- if (HoodieSparkUtils.gteqSpark3_3) {
- spark.sql("set hoodie.schema.on.read.enable=true")
- // Create table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '$tablePath'
- | tblproperties (
- | type = 'cow',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- // Insert data to the new table.
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
-
- // add column
- spark.sql(s"alter table $tableName add columns(new_col string)")
- val catalogTable = spark.sessionState.catalog.getTableMetadata(new
TableIdentifier(tableName))
- assertResult(Seq("id", "name", "price", "ts", "new_col")) {
-
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
+ // Insert data to the new table.
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+
+ // add column
+ spark.sql(s"alter table $tableName add columns(new_col string)")
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(new
TableIdentifier(tableName))
+ assertResult(Seq("id", "name", "price", "ts", "new_col")) {
+
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
+ }
+ checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
+ Seq(1, "a1", 10.0, 1000, null)
+ )
+ // disable schema on read.
+ spark.sql("set hoodie.schema.on.read.enable=false")
+ spark.sql(s"refresh table $tableName")
+ // Insert data to the new table.
+ spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
+ // write should succeed. and subsequent read should succeed as well.
+ checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
+ Seq(1, "a1", 10.0, 1000, null),
+ Seq(2, "a2", 12.0, 2000, "e0")
+ )
}
- checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
- Seq(1, "a1", 10.0, 1000, null)
- )
- // disable schema on read.
- spark.sql("set hoodie.schema.on.read.enable=false")
- spark.sql(s"refresh table $tableName")
- // Insert data to the new table.
- spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
- // write should succeed. and subsequent read should succeed as well.
- checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
- Seq(1, "a1", 10.0, 1000, null),
- Seq(2, "a2", 12.0, 2000, "e0")
- )
}
}
}
@@ -896,7 +898,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}
- test("Test DOUBLE to DECIMAL schema evolution (lost in scale)") {
+ test("Test DOUBLE or STRING to DECIMAL schema evolution (lost in scale)") {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
// Using INMEMORY index for mor table so that log files will be
created instead of parquet
@@ -957,57 +959,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
Seq(12, "a12", "-10.04", 1000)
)
- // clear after using INMEMORY index
- HoodieInMemoryHashIndex.clear()
- }
- }
- }
- }
-
- test("Test STRING to DECIMAL schema evolution (lost in scale)") {
- Seq("cow", "mor").foreach { tableType =>
- withTempDir { tmp =>
- // Using INMEMORY index for mor table so that log files will be
created instead of parquet
- val tableName = generateTableName
- if (HoodieSparkUtils.gteqSpark3_3) {
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price string,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey = 'id',
- | type = '$tableType',
- | preCombineField = 'ts'
- | ${if (tableType.equals("mor")) ", hoodie.index.type =
'INMEMORY'" else ""}
- | )
- """.stripMargin)
-
- spark.sql(s"insert into $tableName values " +
- // testing the rounding behaviour to ensure that HALF_UP is used
for positive values
- "(1, 'a1', '10.024', 1000)," +
- "(2, 'a2', '10.025', 1000)," +
- "(3, 'a3', '10.026', 1000)," +
- // testing the rounding behaviour to ensure that HALF_UP is used
for negative values
- "(4, 'a4', '-10.024', 1000)," +
- "(5, 'a5', '-10.025', 1000)," +
- "(6, 'a6', '-10.026', 1000)," +
- // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
- "(7, 'a7', '10.034', 1000)," +
- "(8, 'a8', '10.035', 1000)," +
- "(9, 'a9', '10.036', 1000)," +
- // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN
will retain the same result)
- "(10, 'a10', '-10.034', 1000)," +
- "(11, 'a11', '-10.035', 1000)," +
- "(12, 'a12', '-10.036', 1000)")
-
-
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
-
- spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql(s"alter table $tableName alter column price type
decimal(4, 2)")
checkAnswer(s"select id, name, cast(price as string), ts from
$tableName order by id")(
@@ -1033,42 +984,44 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
test("Test extract partition values from path when schema evolution is
enabled") {
- withTable(generateTableName) { tableName =>
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | ts bigint,
- | region string,
- | dt date
- |) using hudi
- |tblproperties (
- | primaryKey = 'id',
- | type = 'cow',
- | preCombineField = 'ts'
- |)
- |partitioned by (region, dt)""".stripMargin)
-
- withSQLConf("hoodie.datasource.read.extract.partition.values.from.path"
-> "true",
- "hoodie.schema.on.read.enable" -> "true") {
- spark.sql(s"insert into $tableName partition (region='reg1',
dt='2023-10-01') " +
- s"select 1, 'name1', 1000")
- checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where region='reg1'")(
- Seq(1, "name1", 1000, "reg1", "2023-10-01")
- )
-
- // apply schema evolution and perform a read again
- spark.sql(s"alter table $tableName add columns(price double)")
- checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where region='reg1'")(
- Seq(1, "name1", 1000, "reg1", "2023-10-01")
- )
-
- // ensure this won't be broken in the future
- // BooleanSimplification is always applied when calling
HoodieDataSourceHelper#getNonPartitionFilters
- checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where not(region='reg2' or id=2)")(
- Seq(1, "name1", 1000, "reg1", "2023-10-01")
- )
+ Seq("cow", "mor").foreach { tableType =>
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts bigint,
+ | region string,
+ | dt date
+ |) using hudi
+ |tblproperties (
+ | primaryKey = 'id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ |)
+ |partitioned by (region, dt)""".stripMargin)
+
+
withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" ->
"true",
+ "hoodie.schema.on.read.enable" -> "true") {
+ spark.sql(s"insert into $tableName partition (region='reg1',
dt='2023-10-01') " +
+ s"select 1, 'name1', 1000")
+ checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where region='reg1'")(
+ Seq(1, "name1", 1000, "reg1", "2023-10-01")
+ )
+
+ // apply schema evolution and perform a read again
+ spark.sql(s"alter table $tableName add columns(price double)")
+ checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where region='reg1'")(
+ Seq(1, "name1", 1000, "reg1", "2023-10-01")
+ )
+
+ // ensure this won't be broken in the future
+ // BooleanSimplification is always applied when calling
HoodieDataSourceHelper#getNonPartitionFilters
+ checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where not(region='reg2' or id=2)")(
+ Seq(1, "name1", 1000, "reg1", "2023-10-01")
+ )
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
index 9275476682e..cfa91f2cf01 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
@@ -48,44 +48,46 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
}
test("Test delete all records in filegroup") {
- withTempDir { tmp =>
- val databaseName = "hudi_database"
- spark.sql(s"create database if not exists $databaseName")
- spark.sql(s"use $databaseName")
- val tableName = generateTableName
- val basePath = s"${tmp.getCanonicalPath}/$tableName"
- spark.sql(
- s"""
- | create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- | ) using hudi
- | partitioned by (name)
- | tblproperties (
- | 'primaryKey' = 'id',
- | 'preCombineField' = 'ts',
- | 'hoodie.table.cdc.enabled' = 'true',
- | 'hoodie.table.cdc.supplemental.logging.mode' =
'$DATA_BEFORE_AFTER',
- | type = 'cow'
- | )
- | location '$basePath'
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val databaseName = "hudi_database"
+ spark.sql(s"create database if not exists $databaseName")
+ spark.sql(s"use $databaseName")
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | partitioned by (name)
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'preCombineField' = 'ts',
+ | 'hoodie.table.cdc.enabled' = 'true',
+ | 'hoodie.table.cdc.supplemental.logging.mode' =
'$DATA_BEFORE_AFTER',
+ | type = '$tableType'
+ | )
+ | location '$basePath'
""".stripMargin)
- val metaClient = createMetaClient(spark, basePath)
- spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12,
1000, 'a2')")
- assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 2)
- val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName
where id=1").head().get(0)
- val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
- val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
- cdcDataOnly1.show(false)
- assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
+ val metaClient = createMetaClient(spark, basePath)
+ spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12,
1000, 'a2')")
+ assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 2)
+ val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName
where id=1").head().get(0)
+ val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
+ val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ cdcDataOnly1.show(false)
+ assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
- spark.sql(s"delete from $tableName where id = 1")
- val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
- assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
- assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 1)
- assert(!spark.sql(s"select _hoodie_file_name from
$tableName").head().get(0).equals(fgForID1))
+ spark.sql(s"delete from $tableName where id = 1")
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+ assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
+ assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 1)
+ assert(!spark.sql(s"select _hoodie_file_name from
$tableName").head().get(0).equals(fgForID1))
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
index 31948c3298d..07d4c302b0d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
@@ -136,4 +136,74 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
)
}
}
+
+ test("Test compaction before and after deletes") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ spark.sql("set hoodie.parquet.max.file.size = 10000")
+ // disable automatic inline compaction
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+ // set compaction frequency to every 2 commits
+ spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+ // insert data
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+ // update data
+ spark.sql(s"update $tableName set price = 11 where id = 1")
+ // update data
+ spark.sql(s"update $tableName set price = 12 where id = 2")
+ // schedule compaction
+ spark.sql(s"schedule compaction on $tableName")
+ // show compaction
+ var compactionRows = spark.sql(s"show compaction on $tableName limit
10").collect()
+ var timestamps = compactionRows.map(_.getString(0))
+ assertResult(1)(timestamps.length)
+ // run compaction
+ spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
+ // check data
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 11.0, 1000),
+ Seq(2, "a2", 12.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+ // show compaction
+ assertResult(1)(spark.sql(s"show compaction on
$tableName").collect().length)
+ // Try deleting non-existent row
+ spark.sql(s"DELETE FROM $tableName WHERE id = 41")
+ // Delete record identified by some field other than the primary-key
+ spark.sql(s"DELETE FROM $tableName WHERE name = 'a2'")
+ // schedule compaction
+ spark.sql(s"schedule compaction on $tableName")
+ // show compaction
+ compactionRows = spark.sql(s"show compaction on $tableName limit
10").collect()
+ timestamps = compactionRows.map(_.getString(0)).sorted
+ assertResult(2)(timestamps.length)
+ // run compaction
+ spark.sql(s"run compaction on $tableName at ${timestamps(1)}")
+ // check data, only 2 records should be present
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 11.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+ // show compaction
+ assertResult(2)(spark.sql(s"show compaction on $tableName limit
10").collect().length)
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
index 4be2532fb25..4becf4c7f0a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
@@ -29,169 +29,176 @@ class TestDataSkippingQuery extends
HoodieSparkSqlTestBase {
test("Test the data skipping query involves conditions " +
"that cover both columns supported by column stats and those that are not
supported.") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql("set hoodie.metadata.enable = true")
- spark.sql("set hoodie.metadata.index.column.stats.enable = true")
- spark.sql("set hoodie.enable.data.skipping = true")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | attributes map<string, string>,
- | price double,
- | ts long,
- | dt string
- |) using hudi
- | tblproperties (primaryKey = 'id')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}'
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql("set hoodie.metadata.enable = true")
+ spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+ spark.sql("set hoodie.enable.data.skipping = true")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | attributes map<string, string>,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (primaryKey = 'id', type = '$tableType')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName values
- | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
- | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
- | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
+ | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
+ | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
""".stripMargin)
- // Check the case where the WHERE condition only includes columns not
supported by column stats
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
- // Check the case where the WHERE condition only includes columns
supported by column stats
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
name='a1'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
- // Check the case where the WHERE condition includes both columns
supported by column stats and those that are not
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red' and name='a1'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
+ // Check the case where the WHERE condition only includes columns not
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ // Check the case where the WHERE condition only includes columns
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
name='a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ // Check the case where the WHERE condition includes both columns
supported by column stats and those that are not
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red' and name='a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ }
}
}
test("Test data skipping when specifying columns with column stats
support.") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql("set hoodie.metadata.enable = true")
- spark.sql("set hoodie.metadata.index.column.stats.enable = true")
- spark.sql("set hoodie.enable.data.skipping = true")
- spark.sql("set hoodie.metadata.index.column.stats.column.list = name")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | attributes map<string, string>,
- | price double,
- | ts long,
- | dt string
- |) using hudi
- | tblproperties (primaryKey = 'id')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}'
- """.stripMargin)
- spark.sql(
- s"""
- | insert into $tableName values
- | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
- | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
- | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
- """.stripMargin)
- // Check the case where the WHERE condition only includes columns not
supported by column stats
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
- // Check the case where the WHERE condition only includes columns
supported by column stats
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
name='a1'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
- // Check the case where the WHERE condition includes both columns
supported by column stats and those that are not
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red' and name='a1'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
- // Check WHERE condition that includes both columns with existing column
stats and columns of types
- // that support column stats but for which column stats do not exist
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
ts=1000 and name='a1'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
- }
- }
-
- test("bucket index query") {
- // table bucket prop can not be read in the query sql now, so need to set
these configs
- withSQLConf("hoodie.enable.data.skipping" -> "true",
- "hoodie.bucket.index.hash.field" -> "id",
- "hoodie.bucket.index.num.buckets" -> "20",
- "hoodie.index.type" -> "BUCKET") {
+ Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
val tableName = generateTableName
- // Create a partitioned table
+ spark.sql("set hoodie.metadata.enable = true")
+ spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+ spark.sql("set hoodie.enable.data.skipping = true")
+ spark.sql("set hoodie.metadata.index.column.stats.column.list = name")
spark.sql(
s"""
|create table $tableName (
| id int,
- | dt string,
| name string,
+ | attributes map<string, string>,
| price double,
- | ts long
+ | ts long,
+ | dt string
|) using hudi
- | tblproperties (
- | primaryKey = 'id,name',
- | preCombineField = 'ts',
- | hoodie.index.type = 'BUCKET',
- | hoodie.bucket.index.hash.field = 'id',
- | hoodie.bucket.index.num.buckets = '20')
+ | tblproperties (primaryKey = 'id', type = '$tableType')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
- """.stripMargin)
-
+ """.stripMargin)
spark.sql(
s"""
| insert into $tableName values
- | (1, 'a1', 10, 1000, "2021-01-05"),
- | (2, 'a2', 20, 2000, "2021-01-06"),
- | (3, 'a3', 30, 3000, "2021-01-07")
- """.stripMargin)
-
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1")(
+ | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
+ | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
+ | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
+ """.stripMargin)
+ // Check the case where the WHERE condition only includes columns not
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1 and name = 'a1'")(
+ // Check the case where the WHERE condition only includes columns
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
name='a1'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 2 or id = 5")(
- Seq(2, "a2", 20.0, 2000, "2021-01-06")
- )
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
in (2, 3)")(
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
!= 4")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
- spark.sql("set hoodie.bucket.index.query.pruning = false")
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1")(
+ // Check the case where the WHERE condition includes both columns
supported by column stats and those that are not
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red' and name='a1'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 1 and name = 'a1'")(
+ // Check WHERE condition that includes both columns with existing
column stats and columns of types
+ // that support column stats but for which column stats do not exist
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
ts=1000 and name='a1'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
= 2 or id = 5")(
- Seq(2, "a2", 20.0, 2000, "2021-01-06")
- )
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
in (2, 3)")(
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
- checkAnswer(s"select id, name, price, ts, dt from $tableName where id
!= 4")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
- spark.sql("set hoodie.bucket.index.query.pruning = true")
+ }
+ }
+ }
+
+ test("bucket index query") {
+ Seq("cow", "mor").foreach { tableType =>
+ // table bucket prop can not be read in the query sql now, so need to
set these configs
+ withSQLConf("hoodie.enable.data.skipping" -> "true",
+ "hoodie.bucket.index.hash.field" -> "id",
+ "hoodie.bucket.index.num.buckets" -> "20",
+ "hoodie.index.type" -> "BUCKET") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | preCombineField = 'ts',
+ | type = '$tableType',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.bucket.index.hash.field = 'id',
+ | hoodie.bucket.index.num.buckets = '20')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id = 1")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id = 1 and name = 'a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id = 2 or id = 5")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id in (2, 3)")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id != 4")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ spark.sql("set hoodie.bucket.index.query.pruning = false")
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id = 1")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id = 1 and name = 'a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id = 2 or id = 5")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id in (2, 3)")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
id != 4")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ spark.sql("set hoodie.bucket.index.query.pruning = true")
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 9390df7b73a..f75ddbbd6b4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -192,160 +192,166 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test Insert Into with static partition") {
- withTempDir { tmp =>
- val tableName = generateTableName
- // Create a partitioned table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | dt string,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | tblproperties (primaryKey = 'id')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}'
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (primaryKey = 'id', type = '$tableType')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
""".stripMargin)
- // Insert into static partition
- spark.sql(
- s"""
- | insert into $tableName partition(dt = '2021-01-05')
- | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+ // Insert into static partition
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt = '2021-01-05')
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName partition(dt = '2021-01-06')
- | select 20 as price, 2000 as ts, 2 as id, 'a2' as name
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt = '2021-01-06')
+ | select 20 as price, 2000 as ts, 2 as id, 'a2' as name
""".stripMargin)
- // should not mess with the original order after write the out-of-order
data.
- val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
- val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
- assert(schema.getFieldIndex("id").contains(0))
- assert(schema.getFieldIndex("price").contains(2))
+ // should not mess with the original order after write the
out-of-order data.
+ val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+ val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
+ assert(schema.getFieldIndex("id").contains(0))
+ assert(schema.getFieldIndex("price").contains(2))
- // Note: Do not write the field alias, the partition field must be
placed last.
- spark.sql(
- s"""
- | insert into $tableName
- | select 3, 'a3', 30, 3000, '2021-01-07'
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 3, 'a3', 30, 3000, '2021-01-07'
""".stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ }
}
}
test("Test Insert Into with dynamic partition") {
- withTempDir { tmp =>
- val tableName = generateTableName
- // Create a partitioned table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | dt string,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | tblproperties (primaryKey = 'id')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}'
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (primaryKey = 'id', type = '$tableType')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
""".stripMargin)
- // Insert into dynamic partition
- spark.sql(
- s"""
- | insert into $tableName partition(dt)
- | select 1 as id, '2021-01-05' as dt, 'a1' as name, 10 as price,
1000 as ts
+ // Insert into dynamic partition
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt)
+ | select 1 as id, '2021-01-05' as dt, 'a1' as name, 10 as price,
1000 as ts
""".stripMargin)
- // should not mess with the original order after write the out-of-order
data.
- val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
- val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
- assert(schema.getFieldIndex("id").contains(0))
- assert(schema.getFieldIndex("price").contains(2))
+ // should not mess with the original order after write the
out-of-order data.
+ val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+ val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
+ assert(schema.getFieldIndex("id").contains(0))
+ assert(schema.getFieldIndex("price").contains(2))
- spark.sql(
- s"""
- | insert into $tableName
- | select 2 as id, 'a2' as name, 20 as price, 2000 as ts,
'2021-01-06' as dt
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 2 as id, 'a2' as name, 20 as price, 2000 as ts,
'2021-01-06' as dt
""".stripMargin)
- // Note: Do not write the field alias, the partition field must be
placed last.
- spark.sql(
- s"""
- | insert into $tableName
- | select 3, 'a3', 30, 3000, '2021-01-07'
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 3, 'a3', 30, 3000, '2021-01-07'
""".stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ }
}
}
test("Test Insert Into with multi partition") {
- withRecordType()(withTempDir { tmp =>
- val tableName = generateTableName
- // Create a partitioned table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | dt string,
- | name string,
- | price double,
- | ht string,
- | ts long
- |) using hudi
- | tblproperties (primaryKey = 'id')
- | partitioned by (dt, ht)
- | location '${tmp.getCanonicalPath}'
+ Seq("cow", "mor").foreach { tableType =>
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ht string,
+ | ts long
+ |) using hudi
+ | tblproperties (primaryKey = 'id', type = '$tableType')
+ | partitioned by (dt, ht)
+ | location '${tmp.getCanonicalPath}'
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName partition(dt, ht)
- | select 1 as id, 'a1' as name, 10 as price,'20210101' as dt, 1000
as ts, '01' as ht
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt, ht)
+ | select 1 as id, 'a1' as name, 10 as price,'20210101' as dt,
1000 as ts, '01' as ht
""".stripMargin)
- // Insert into static partition and dynamic partition
- spark.sql(
- s"""
- | insert into $tableName partition(dt = '20210102', ht)
- | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, '02' as ht
+ // Insert into static partition and dynamic partition
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt = '20210102', ht)
+ | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, '02' as
ht
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName partition(dt, ht = '03')
- | select 3 as id, 'a3' as name, 30 as price, 3000 as ts, '20210103'
as dt
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt, ht = '03')
+ | select 3 as id, 'a3' as name, 30 as price, 3000 as ts,
'20210103' as dt
""".stripMargin)
- // Note: Do not write the field alias, the partition field must be
placed last.
- spark.sql(
- s"""
- | insert into $tableName
- | select 4, 'a4', 40, 4000, '20210104', '04'
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 4, 'a4', 40, 4000, '20210104', '04'
""".stripMargin)
- checkAnswer(s"select id, name, price, ts, dt, ht from $tableName")(
- Seq(1, "a1", 10.0, 1000, "20210101", "01"),
- Seq(2, "a2", 20.0, 2000, "20210102", "02"),
- Seq(3, "a3", 30.0, 3000, "20210103", "03"),
- Seq(4, "a4", 40.0, 4000, "20210104", "04")
- )
- })
+ checkAnswer(s"select id, name, price, ts, dt, ht from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "20210101", "01"),
+ Seq(2, "a2", 20.0, 2000, "20210102", "02"),
+ Seq(3, "a3", 30.0, 3000, "20210103", "03"),
+ Seq(4, "a4", 40.0, 4000, "20210104", "04")
+ )
+ })
+ }
}
- test("Test Insert Into None Partitioned Table") {
+ test("Test Insert Into Non Partitioned Table") {
withRecordType(Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK),
Map(HoodieRecordType.SPARK ->
// SparkMerger should use "HoodieSparkValidateDuplicateKeyRecordMerger"
// with "hoodie.sql.insert.mode=strict"
@@ -851,7 +857,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
- test("Test Insert timestamp when 'spark.sql.datetime.java8API.enabled'
enables") {
+ test("Test Insert timestamp when 'spark.sql.datetime.java8API.enabled'
enabled") {
withRecordType() {
withSQLConf("spark.sql.datetime.java8API.enabled" -> "true") {
val tableName = generateTableName
@@ -1291,64 +1297,70 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test combine before insert") {
- withSQLConf("hoodie.sql.bulk.insert.enable" -> "false",
"hoodie.merge.allow.duplicate.on.inserts" -> "false") {
- withRecordType()(withTempDir{tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
+ Seq("cow", "mor").foreach { tableType =>
+ withSQLConf("hoodie.sql.bulk.insert.enable" -> "false",
"hoodie.merge.allow.duplicate.on.inserts" -> "false") {
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(
- s"""
- |insert overwrite table $tableName
- |select * from (
- | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
- | union all
- | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
- | )
- |""".stripMargin
- )
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 11.0, 1001)
- )
- })
+ spark.sql(
+ s"""
+ |insert overwrite table $tableName
+ |select * from (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+ | union all
+ | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
+ | )
+ |""".stripMargin
+ )
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 11.0, 1001)
+ )
+ })
+ }
}
}
test("Test insert pk-table") {
- withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
- withRecordType()(withTempDir{tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
+ Seq("cow", "mor").foreach { tableType =>
+ withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 11.0, 1000)
- )
- })
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 11.0, 1000)
+ )
+ })
+ }
}
}
@@ -1952,39 +1964,41 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test Insert Into with auto generate record keys with precombine ") {
- withTempDir { tmp =>
- val tableName = generateTableName
- // Create a partitioned table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | dt string,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | type = 'cow',
- | preCombineField = 'price'
- | )
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | type = '$tableType',
+ | preCombineField = 'price'
+ | )
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName values
- | (1, 'a1', 10, 1000, "2021-01-05"),
- | (2, 'a2', 20, 2000, "2021-01-06"),
- | (3, 'a3', 30, 3000, "2021-01-07")
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
""".stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ }
}
}
@@ -2527,7 +2541,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK
INSERT operation") {
withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
- Seq("cow").foreach { tableType =>
+ Seq("cow", "mor").foreach { tableType =>
val dupPolicy = DROP_INSERT_DUP_POLICY
withTable(generateTableName) { tableName =>
ingestAndValidateDropDupPolicyBulkInsert(tableType, tableName, tmp,
@@ -2540,7 +2554,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
test("Test FAIL insert dup policy with INSERT_INTO explicit new configs") {
withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
- Seq("cow").foreach { tableType =>
+ Seq("cow", "mor").foreach { tableType =>
val operation = WriteOperationType.UPSERT
val dupPolicy = FAIL_INSERT_DUP_POLICY
withTable(generateTableName) { tableName =>
@@ -2592,7 +2606,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
test(s"Test INSERT INTO with upsert operation type") {
if (HoodieSparkUtils.gteqSpark3_3) {
withTempDir { tmp =>
- Seq("mor").foreach { tableType =>
+ Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
spark.sql(
s"""
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index 9da5a1a6818..1c70f70aaa0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -135,7 +135,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
val targetTable = generateTableName
spark.sql(
s"""
- |create table ${targetTable} (
+ |create table $targetTable (
| id int,
| name string,
| data int,
@@ -152,7 +152,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
|""".stripMargin)
spark.sql(
s"""
- |merge into ${targetTable} as target
+ |merge into $targetTable as target
|using (
|select 1 as id, 'lb' as name, 6 as data, 'shu' as country,
1646643193 as ts
|) source
@@ -164,7 +164,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
|""".stripMargin)
spark.sql(
s"""
- |merge into ${targetTable} as target
+ |merge into $targetTable as target
|using (
|select 1 as id, 'lb' as name, 5 as data, 'shu' as country,
1646643196 as ts
|) source
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
index ab5657d8d3a..972d246e574 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
@@ -24,116 +24,120 @@ import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
class TestTimeTravelTable extends HoodieSparkSqlTestBase {
test("Test Insert and Update Record with time travel") {
- if (HoodieSparkUtils.gteqSpark3_3) {
- withRecordType()(withTempDir { tmp =>
- val tableName1 = generateTableName
- spark.sql(
- s"""
- |create table $tableName1 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | tblproperties (
- | type = 'cow',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
- | location '${tmp.getCanonicalPath}/$tableName1'
+ Seq("cow", "mor").foreach { tableType =>
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withRecordType()(withTempDir { tmp =>
+ val tableName1 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName1 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
- // 1st commit instant
- spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+ // 1st commit instant
+ spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
- val metaClient1 = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName1")
- val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
- .lastInstant().get().getTimestamp
+ val metaClient1 = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName1")
+ val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
+ .lastInstant().get().getTimestamp
- // 2nd commit instant
- spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")
+ // 2nd commit instant
+ spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")
- checkAnswer(s"select id, name, price, ts from $tableName1")(
- Seq(1, "a2", 20.0, 2000)
- )
+ checkAnswer(s"select id, name, price, ts from $tableName1")(
+ Seq(1, "a2", 20.0, 2000)
+ )
- // time travel as of instant 1
- checkAnswer(
- s"select id, name, price, ts from $tableName1 TIMESTAMP AS OF
'$instant1'")(
- Seq(1, "a1", 10.0, 1000)
- )
- })
+ // time travel as of instant 1
+ checkAnswer(
+ s"select id, name, price, ts from $tableName1 TIMESTAMP AS OF
'$instant1'")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+ })
+ }
}
}
test("Test Insert Into Records with time travel To new Table") {
- if (HoodieSparkUtils.gteqSpark3_3) {
- withTempDir { tmp =>
- // Create Non-Partitioned table
- val tableName1 = generateTableName
- spark.sql(
- s"""
- |create table $tableName1 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | tblproperties (
- | type = 'cow',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
- | location '${tmp.getCanonicalPath}/$tableName1'
+ Seq("cow", "mor").foreach { tableType =>
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ // Create Non-Partitioned table
+ val tableName1 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName1 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
- spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
- val metaClient1 = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName1")
+ val metaClient1 = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName1")
- val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
- .lastInstant().get().getTimestamp
+ val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
+ .lastInstant().get().getTimestamp
- val tableName2 = generateTableName
- // Create a partitioned table
- spark.sql(
- s"""
- |create table $tableName2 (
- | id int,
- | name string,
- | price double,
- | ts long,
- | dt string
- |) using hudi
- | tblproperties (primaryKey = 'id')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}/$tableName2'
+ val tableName2 = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (primaryKey = 'id', type = '$tableType')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$tableName2'
""".stripMargin)
- // Insert into dynamic partition
- spark.sql(
- s"""
- | insert into $tableName2
- | select id, name, price, ts, '2022-02-14' as dt
- | from $tableName1 TIMESTAMP AS OF '$instant1'
+ // Insert into dynamic partition
+ spark.sql(
+ s"""
+ | insert into $tableName2
+ | select id, name, price, ts, '2022-02-14' as dt
+ | from $tableName1 TIMESTAMP AS OF '$instant1'
""".stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName2")(
- Seq(1, "a1", 10.0, 1000, "2022-02-14")
- )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName2")(
+ Seq(1, "a1", 10.0, 1000, "2022-02-14")
+ )
- // Insert into static partition
- spark.sql(
- s"""
- | insert into $tableName2 partition(dt = '2022-02-15')
- | select 2 as id, 'a2' as name, price, ts
- | from $tableName1 TIMESTAMP AS OF '$instant1'
+ // Insert into static partition
+ spark.sql(
+ s"""
+ | insert into $tableName2 partition(dt = '2022-02-15')
+ | select 2 as id, 'a2' as name, price, ts
+ | from $tableName1 TIMESTAMP AS OF '$instant1'
""".stripMargin)
- checkAnswer(
- s"select id, name, price, ts, dt from $tableName2")(
- Seq(1, "a1", 10.0, 1000, "2022-02-14"),
- Seq(2, "a2", 10.0, 1000, "2022-02-15")
- )
+ checkAnswer(
+ s"select id, name, price, ts, dt from $tableName2")(
+ Seq(1, "a1", 10.0, 1000, "2022-02-14"),
+ Seq(2, "a2", 10.0, 1000, "2022-02-15")
+ )
+ }
}
}
}
@@ -238,43 +242,45 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
test("Test Select Record with time travel and Repartition") {
if (HoodieSparkUtils.gteqSpark3_3) {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | tblproperties (
- | type = 'cow',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
- | location '${tmp.getCanonicalPath}/$tableName'
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
- // 1st commit instant
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ // 1st commit instant
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- val metaClient = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName")
- val instant1 = metaClient.getActiveTimeline.getAllCommitsTimeline
- .lastInstant().get().getTimestamp
+ val metaClient = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName")
+ val instant1 = metaClient.getActiveTimeline.getAllCommitsTimeline
+ .lastInstant().get().getTimestamp
- // 2nd commit instant
- spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)")
+ // 2nd commit instant
+ spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)")
- checkAnswer(s"select id, name, price, ts from $tableName distribute by
cast(rand() * 2 as int)")(
- Seq(1, "a2", 20.0, 2000)
- )
+ checkAnswer(s"select id, name, price, ts from $tableName distribute
by cast(rand() * 2 as int)")(
+ Seq(1, "a2", 20.0, 2000)
+ )
- // time travel as of instant 1
- checkAnswer(
- s"select id, name, price, ts from $tableName TIMESTAMP AS OF
'$instant1' distribute by cast(rand() * 2 as int)")(
- Seq(1, "a1", 10.0, 1000)
- )
+ // time travel as of instant 1
+ checkAnswer(
+ s"select id, name, price, ts from $tableName TIMESTAMP AS OF
'$instant1' distribute by cast(rand() * 2 as int)")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
index 5162b664880..56b3eb7f315 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
@@ -18,10 +18,12 @@
package org.apache.spark.sql.hudi.dml
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
+import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.HoodieSparkUtils.isSpark2
import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.assertEquals
@@ -316,4 +318,63 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
}
}
}
+
+ // Test update table with clustering
+ test("Test Update Table with Clustering") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | partition long
+ |) using hudi
+ | partitioned by (partition)
+ | location '$basePath'
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.clustering.keygen.class =
"org.apache.hudi.keygen.NonpartitionedKeyGenerator"
+ | )
+ """.stripMargin)
+
+ // insert data
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
+
+ // update data
+ spark.sql(s"update $tableName set price = 20 where id > 1")
+ checkAnswer(s"select id, name, price, ts from $tableName where id >
1")(
+ Seq(2, "a2", 20.0, 1001),
+ Seq(3, "a3", 20.0, 1002)
+ )
+
+ // update data
+ spark.sql(s"update $tableName set price = price * 2 where id = 2")
+ checkAnswer(s"select id, name, price, ts from $tableName where id =
2")(
+ Seq(2, "a2", 40.0, 1001)
+ )
+
+ val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, Option(tableName))
+ // Generate the first clustering plan
+ val firstScheduleInstant = client.createNewInstantTime()
+ client.scheduleClusteringAtInstant(firstScheduleInstant,
HOption.empty())
+ checkAnswer(s"call show_clustering(path => '$basePath',
show_involved_partition => true)")(
+ Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(),
"partition=1000,partition=1001,partition=1002")
+ )
+ // Do clustering for all the clustering plan
+ checkAnswer(s"call run_clustering(path => '$basePath', order =>
'partition')")(
+ Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(),
"*")
+ )
+ }
+ }
+ }
}