This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 80c3f033b54 [MINOR] Separate HoodieSparkWriterTestBase to reduce
duplication (#10832)
80c3f033b54 is described below
commit 80c3f033b542f1011cbdead8a184d4753e98ca85
Author: Geser Dugarov <[email protected]>
AuthorDate: Fri Mar 8 07:52:52 2024 +0700
[MINOR] Separate HoodieSparkWriterTestBase to reduce duplication (#10832)
---
.../apache/hudi/HoodieSparkWriterTestBase.scala | 136 ++++++++++++++++++
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 152 +++------------------
.../apache/hudi/TestHoodieSparkSqlWriterUtc.scala | 2 +-
.../hudi/TestTableSchemaResolverWithSparkSQL.scala | 102 +-------------
4 files changed, 162 insertions(+), 230 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala
new file mode 100644
index 00000000000..c0c1c2c12bd
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.commons.io.FileUtils
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.{Dataset, Row, SQLContext, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+
+import scala.collection.JavaConverters
+
+class HoodieSparkWriterTestBase {
+ var spark: SparkSession = _
+ var sqlContext: SQLContext = _
+ var sc: SparkContext = _
+ var tempPath: java.nio.file.Path = _
+ var tempBootStrapPath: java.nio.file.Path = _
+ var hoodieFooTableName = "hoodie_foo_tbl"
+ var tempBasePath: String = _
+ var commonTableModifier: Map[String, String] = Map()
+
+ case class StringLongTest(uuid: String, ts: Long)
+
+ /**
+ * Setup method running before each test.
+ */
+ @BeforeEach
+ def setUp(): Unit = {
+ initSparkContext()
+ tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+ tempBootStrapPath =
java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
+ tempBasePath = tempPath.toAbsolutePath.toString
+ commonTableModifier = getCommonParams(tempPath, hoodieFooTableName,
HoodieTableType.COPY_ON_WRITE.name())
+ }
+
+ /**
+ * Tear down method running after each test.
+ */
+ @AfterEach
+ def tearDown(): Unit = {
+ cleanupSparkContexts()
+ FileUtils.deleteDirectory(tempPath.toFile)
+ FileUtils.deleteDirectory(tempBootStrapPath.toFile)
+ }
+
+ /**
+ * Utility method for initializing the spark context.
+ */
+ def initSparkContext(): Unit = {
+ val sparkConf =
HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName)
+
+ spark = SparkSession.builder()
+ .withExtensions(new HoodieSparkSessionExtension)
+ .config(sparkConf)
+ .getOrCreate()
+
+ sc = spark.sparkContext
+ sc.setLogLevel("ERROR")
+ sqlContext = spark.sqlContext
+ }
+
+ /**
+ * Utility method for cleaning up spark resources.
+ */
+ def cleanupSparkContexts(): Unit = {
+ if (sqlContext != null) {
+ sqlContext.clearCache();
+ sqlContext = null;
+ }
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ if (spark != null) {
+ spark.close()
+ }
+ }
+
+ /**
+ * Utility method for creating common params for writer.
+ *
+ * @param path Path for hoodie table
+ * @param hoodieFooTableName Name of hoodie table
+ * @param tableType Type of table
+ * @return Map of common params
+ */
+ def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String,
tableType: String): Map[String, String] = {
+ Map("path" -> path.toAbsolutePath.toString,
+ HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.SimpleKeyGenerator")
+ }
+
+ /**
+ * Utility method for dropping all hoodie meta related columns.
+ */
+ def dropMetaFields(df: Dataset[Row]): Dataset[Row] = {
+
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+ .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+ }
+
+ /**
+ * Utility method for converting list of Row to list of Seq.
+ *
+ * @param inputList list of Row
+ * @return list of Seq
+ */
+ def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
+ JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index d7a1f9331ae..0767d055915 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -19,10 +19,8 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.commons.io.FileUtils
-import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.model._
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord,
HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig,
HoodieWriteConfig}
@@ -30,19 +28,15 @@ import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.keygen.{ComplexKeyGenerator,
NonpartitionedKeyGenerator, SimpleKeyGenerator}
-import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
-import org.apache.spark.SparkContext
+import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql._
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{expr, lit}
-import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertNull, assertTrue, fail}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider._
+import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource,
MethodSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
@@ -52,7 +46,6 @@ import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._
-import scala.collection.JavaConverters
/**
* Test suite for SparkSqlWriter class.
@@ -60,113 +53,10 @@ import scala.collection.JavaConverters
* Otherwise UTC tests will generate infinite loops, if there is any initiated
test with time zone that is greater then UTC+0.
* The reason is in a saved value in the heap of static {@link
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.lastInstantTime}.
*/
-class TestHoodieSparkSqlWriter {
- var spark: SparkSession = _
- var sqlContext: SQLContext = _
- var sc: SparkContext = _
- var tempPath: java.nio.file.Path = _
- var tempBootStrapPath: java.nio.file.Path = _
- var hoodieFooTableName = "hoodie_foo_tbl"
- var tempBasePath: String = _
- var commonTableModifier: Map[String, String] = Map()
- case class StringLongTest(uuid: String, ts: Long)
+class TestHoodieSparkSqlWriter extends HoodieSparkWriterTestBase {
/**
- * Setup method running before each test.
- */
- @BeforeEach
- def setUp(): Unit = {
- initSparkContext()
- tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
- tempBootStrapPath =
java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
- tempBasePath = tempPath.toAbsolutePath.toString
- commonTableModifier = getCommonParams(tempPath, hoodieFooTableName,
HoodieTableType.COPY_ON_WRITE.name())
- }
-
- /**
- * Tear down method running after each test.
- */
- @AfterEach
- def tearDown(): Unit = {
- cleanupSparkContexts()
- FileUtils.deleteDirectory(tempPath.toFile)
- FileUtils.deleteDirectory(tempBootStrapPath.toFile)
- }
-
- /**
- * Utility method for initializing the spark context.
- *
- * TODO rebase this onto existing base class to avoid duplication
- */
- def initSparkContext(): Unit = {
- val sparkConf = getSparkConfForTest(getClass.getSimpleName)
-
- spark = SparkSession.builder()
- .withExtensions(new HoodieSparkSessionExtension)
- .config(sparkConf)
- .getOrCreate()
-
- sc = spark.sparkContext
- sc.setLogLevel("ERROR")
- sqlContext = spark.sqlContext
- }
-
- /**
- * Utility method for cleaning up spark resources.
- */
- def cleanupSparkContexts(): Unit = {
- if (sqlContext != null) {
- sqlContext.clearCache();
- sqlContext = null;
- }
- if (sc != null) {
- sc.stop()
- sc = null
- }
- if (spark != null) {
- spark.close()
- }
- }
-
- /**
- * Utility method for dropping all hoodie meta related columns.
- */
- def dropMetaFields(df: Dataset[Row]): Dataset[Row] = {
-
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
-
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
- .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
- }
-
- /**
- * Utility method for creating common params for writer.
- *
- * @param path Path for hoodie table
- * @param hoodieFooTableName Name of hoodie table
- * @param tableType Type of table
- * @return Map of common params
- */
- def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String,
tableType: String): Map[String, String] = {
- Map("path" -> path.toAbsolutePath.toString,
- HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
- "hoodie.insert.shuffle.parallelism" -> "1",
- "hoodie.upsert.shuffle.parallelism" -> "1",
- DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
- DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
- DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
- DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.SimpleKeyGenerator")
- }
-
- /**
- * Utility method for converting list of Row to list of Seq.
- *
- * @param inputList list of Row
- * @return list of Seq
- */
- def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
- JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
-
- /**
- * Utility method for performing bulk insert tests.
+ * Local utility method for performing bulk insert tests.
*
* @param sortMode Bulk insert sort mode
* @param populateMetaFields Flag for populating meta fields
@@ -226,12 +116,13 @@ class TestHoodieSparkSqlWriter {
val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty)
val rhsKey = "hoodie.right.hand.side.key"
val rhsVal = "hoodie.right.hand.side.val"
- val modifier = Map(OPERATION.key -> INSERT_OPERATION_OPT_VAL,
TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal)
+ val modifier = Map(DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal)
val modified = HoodieWriterUtils.parametersWithWriteDefaults(modifier)
val matcher = (k: String, v: String) => modified(k) should be(v)
originals foreach {
- case ("hoodie.datasource.write.operation", _) =>
matcher("hoodie.datasource.write.operation", INSERT_OPERATION_OPT_VAL)
- case ("hoodie.datasource.write.table.type", _) =>
matcher("hoodie.datasource.write.table.type", MOR_TABLE_TYPE_OPT_VAL)
+ case ("hoodie.datasource.write.operation", _) =>
matcher("hoodie.datasource.write.operation",
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ case ("hoodie.datasource.write.table.type", _) =>
matcher("hoodie.datasource.write.table.type",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
case (`rhsKey`, _) => matcher(rhsKey, rhsVal)
case (k, v) => matcher(k, v)
}
@@ -245,7 +136,7 @@ class TestHoodieSparkSqlWriter {
spark.stop()
val session = SparkSession.builder()
// Here we intentionally remove the "spark.serializer" config to test
failure
- .config(getSparkConfForTest("hoodie_test").remove("spark.serializer"))
+
.config(HoodieClientTestUtils.getSparkConfForTest("hoodie_test").remove("spark.serializer"))
.getOrCreate()
try {
val sqlContext = session.sqlContext
@@ -290,7 +181,7 @@ class TestHoodieSparkSqlWriter {
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
//on same path try append with delete operation and
different("hoodie_bar_tbl") table name which should throw an exception
- val deleteTableModifier = barTableModifier ++ Map(OPERATION.key ->
"delete")
+ val deleteTableModifier = barTableModifier ++
Map(DataSourceWriteOptions.OPERATION.key -> "delete")
val deleteCmdException =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.Append, deleteTableModifier, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
@@ -454,7 +345,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val fooTableModifier =
commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
- .updated(INSERT_DROP_DUPS.key, "true")
+ .updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "true")
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -687,10 +578,11 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
.setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
- .setPayloadClassName(PAYLOAD_CLASS_NAME.key)
- .setPreCombineField(fooTableParams.getOrElse(PRECOMBINE_FIELD.key,
PRECOMBINE_FIELD.defaultValue()))
+ .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key)
+
.setPreCombineField(fooTableParams.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key,
DataSourceWriteOptions.PRECOMBINE_FIELD.defaultValue()))
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
-
.setKeyGeneratorClassProp(fooTableParams.getOrElse(KEYGENERATOR_CLASS_NAME.key,
KEYGENERATOR_CLASS_NAME.defaultValue()))
+
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
if(addBootstrapPath) {
tableMetaClientBuilder
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
@@ -1364,19 +1256,19 @@ object TestHoodieSparkSqlWriter {
// NOTE: Hudi doesn't support Orc in Spark < 3.0
// Please check HUDI-4496 for more details
- val targetScenarios = if (gteqSpark3_0) {
+ val targetScenarios = if (HoodieSparkUtils.gteqSpark3_0) {
parquetScenarios ++ orcScenarios
} else {
parquetScenarios
}
- java.util.Arrays.stream(targetScenarios.map(as =>
arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
+ java.util.Arrays.stream(targetScenarios.map(as =>
Arguments.arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
}
def deletePartitionsWildcardTestParams(): java.util.stream.Stream[Arguments]
= {
java.util.stream.Stream.of(
- arguments("*5/03/1*", Seq("2016/03/15")),
- arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
+ Arguments.arguments("*5/03/1*", Seq("2016/03/15")),
+ Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
index df8614f5e2a..ca4d23f719d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
@@ -36,7 +36,7 @@ import java.util.TimeZone
* value of static {@link HoodieInstantTimeGenerator.lastInstantTime} in the
heap,
* which will be greater than instant time for {@link
HoodieTimelineTimeZone.UTC}.
*/
-class TestHoodieSparkSqlWriterUtc extends TestHoodieSparkSqlWriter {
+class TestHoodieSparkSqlWriterUtc extends HoodieSparkWriterTestBase {
/*
* Test case for instant is generated with commit timezone when
TIMELINE_TIMEZONE set to UTC
* related to HUDI-5978
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
index d9d5b59c8d7..70886d96444 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
@@ -18,120 +18,24 @@
package org.apache.hudi
import org.apache.avro.Schema
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.model.HoodieMetadataRecord
-import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
-import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
+import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
-import scala.collection.JavaConverters
-
/**
* Test suite for TableSchemaResolver with SparkSqlWriter.
*/
@Tag("functional")
-class TestTableSchemaResolverWithSparkSQL {
- var spark: SparkSession = _
- var sqlContext: SQLContext = _
- var sc: SparkContext = _
- var tempPath: java.nio.file.Path = _
- var tempBootStrapPath: java.nio.file.Path = _
- var hoodieFooTableName = "hoodie_foo_tbl"
- var tempBasePath: String = _
- var commonTableModifier: Map[String, String] = Map()
-
- case class StringLongTest(uuid: String, ts: Long)
-
- /**
- * Setup method running before each test.
- */
- @BeforeEach
- def setUp(): Unit = {
- initSparkContext()
- tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
- tempBootStrapPath =
java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
- tempBasePath = tempPath.toAbsolutePath.toString
- commonTableModifier = getCommonParams(tempPath, hoodieFooTableName,
HoodieTableType.COPY_ON_WRITE.name())
- }
-
- /**
- * Tear down method running after each test.
- */
- @AfterEach
- def tearDown(): Unit = {
- cleanupSparkContexts()
- FileUtils.deleteDirectory(tempPath.toFile)
- FileUtils.deleteDirectory(tempBootStrapPath.toFile)
- }
-
- /**
- * Utility method for initializing the spark context.
- */
- def initSparkContext(): Unit = {
- spark = SparkSession.builder()
- .config(getSparkConfForTest(hoodieFooTableName))
- .getOrCreate()
- sc = spark.sparkContext
- sc.setLogLevel("ERROR")
- sqlContext = spark.sqlContext
- }
-
- /**
- * Utility method for cleaning up spark resources.
- */
- def cleanupSparkContexts(): Unit = {
- if (sqlContext != null) {
- sqlContext.clearCache();
- sqlContext = null;
- }
- if (sc != null) {
- sc.stop()
- sc = null
- }
- if (spark != null) {
- spark.close()
- }
- }
-
- /**
- * Utility method for creating common params for writer.
- *
- * @param path Path for hoodie table
- * @param hoodieFooTableName Name of hoodie table
- * @param tableType Type of table
- * @return Map of common params
- */
- def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String,
tableType: String): Map[String, String] = {
- Map("path" -> path.toAbsolutePath.toString,
- HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
- "hoodie.insert.shuffle.parallelism" -> "1",
- "hoodie.upsert.shuffle.parallelism" -> "1",
- DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
- DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
- DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
- DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.SimpleKeyGenerator")
- }
-
- /**
- * Utility method for converting list of Row to list of Seq.
- *
- * @param inputList list of Row
- * @return list of Seq
- */
- def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
- JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
+class TestTableSchemaResolverWithSparkSQL extends HoodieSparkWriterTestBase {
@Test
def testTableSchemaResolverInMetadataTable(): Unit = {