This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 80c3f033b54 [MINOR] Separate HoodieSparkWriterTestBase to reduce 
duplication (#10832)
80c3f033b54 is described below

commit 80c3f033b542f1011cbdead8a184d4753e98ca85
Author: Geser Dugarov <[email protected]>
AuthorDate: Fri Mar 8 07:52:52 2024 +0700

    [MINOR] Separate HoodieSparkWriterTestBase to reduce duplication (#10832)
---
 .../apache/hudi/HoodieSparkWriterTestBase.scala    | 136 ++++++++++++++++++
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 152 +++------------------
 .../apache/hudi/TestHoodieSparkSqlWriterUtc.scala  |   2 +-
 .../hudi/TestTableSchemaResolverWithSparkSQL.scala | 102 +-------------
 4 files changed, 162 insertions(+), 230 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala
new file mode 100644
index 00000000000..c0c1c2c12bd
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkWriterTestBase.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.commons.io.FileUtils
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.{Dataset, Row, SQLContext, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+
+import scala.collection.JavaConverters
+
+class HoodieSparkWriterTestBase {
+  var spark: SparkSession = _
+  var sqlContext: SQLContext = _
+  var sc: SparkContext = _
+  var tempPath: java.nio.file.Path = _
+  var tempBootStrapPath: java.nio.file.Path = _
+  var hoodieFooTableName = "hoodie_foo_tbl"
+  var tempBasePath: String = _
+  var commonTableModifier: Map[String, String] = Map()
+
+  case class StringLongTest(uuid: String, ts: Long)
+
+  /**
+   * Setup method running before each test.
+   */
+  @BeforeEach
+  def setUp(): Unit = {
+    initSparkContext()
+    tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+    tempBootStrapPath = 
java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
+    tempBasePath = tempPath.toAbsolutePath.toString
+    commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, 
HoodieTableType.COPY_ON_WRITE.name())
+  }
+
+  /**
+   * Tear down method running after each test.
+   */
+  @AfterEach
+  def tearDown(): Unit = {
+    cleanupSparkContexts()
+    FileUtils.deleteDirectory(tempPath.toFile)
+    FileUtils.deleteDirectory(tempBootStrapPath.toFile)
+  }
+
+  /**
+   * Utility method for initializing the spark context.
+   */
+  def initSparkContext(): Unit = {
+    val sparkConf = 
HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName)
+
+    spark = SparkSession.builder()
+      .withExtensions(new HoodieSparkSessionExtension)
+      .config(sparkConf)
+      .getOrCreate()
+
+    sc = spark.sparkContext
+    sc.setLogLevel("ERROR")
+    sqlContext = spark.sqlContext
+  }
+
+  /**
+   * Utility method for cleaning up spark resources.
+   */
+  def cleanupSparkContexts(): Unit = {
+    if (sqlContext != null) {
+      sqlContext.clearCache();
+      sqlContext = null;
+    }
+    if (sc != null) {
+      sc.stop()
+      sc = null
+    }
+    if (spark != null) {
+      spark.close()
+    }
+  }
+
+  /**
+   * Utility method for creating common params for writer.
+   *
+   * @param path               Path for hoodie table
+   * @param hoodieFooTableName Name of hoodie table
+   * @param tableType          Type of table
+   * @return Map of common params
+   */
+  def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, 
tableType: String): Map[String, String] = {
+    Map("path" -> path.toAbsolutePath.toString,
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.SimpleKeyGenerator")
+  }
+
+  /**
+   * Utility method for dropping all hoodie meta related columns.
+   */
+  def dropMetaFields(df: Dataset[Row]): Dataset[Row] = {
+    
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+      
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+      .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+  }
+
+  /**
+   * Utility method for converting list of Row to list of Seq.
+   *
+   * @param inputList list of Row
+   * @return list of Seq
+   */
+  def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
+    JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index d7a1f9331ae..0767d055915 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -19,10 +19,8 @@ package org.apache.hudi
 
 import org.apache.avro.Schema
 import org.apache.commons.io.FileUtils
-import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.model._
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, 
HoodieRecordPayload, HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, 
HoodieWriteConfig}
@@ -30,19 +28,15 @@ import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
 import org.apache.hudi.functional.TestBootstrap
 import org.apache.hudi.keygen.{ComplexKeyGenerator, 
NonpartitionedKeyGenerator, SimpleKeyGenerator}
-import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
-import org.apache.spark.SparkContext
+import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql._
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.functions.{expr, lit}
-import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
 import org.apache.spark.sql.hudi.command.SqlKeyGenerator
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertNull, assertTrue, fail}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider._
+import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, 
MethodSource, ValueSource}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{spy, times, verify}
 import org.scalatest.Assertions.assertThrows
@@ -52,7 +46,6 @@ import java.io.IOException
 import java.time.Instant
 import java.util.{Collections, Date, UUID}
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters
 
 /**
  * Test suite for SparkSqlWriter class.
@@ -60,113 +53,10 @@ import scala.collection.JavaConverters
  * Otherwise UTC tests will generate infinite loops, if there is any initiated 
test with time zone that is greater then UTC+0.
  * The reason is in a saved value in the heap of static {@link 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.lastInstantTime}.
  */
-class TestHoodieSparkSqlWriter {
-  var spark: SparkSession = _
-  var sqlContext: SQLContext = _
-  var sc: SparkContext = _
-  var tempPath: java.nio.file.Path = _
-  var tempBootStrapPath: java.nio.file.Path = _
-  var hoodieFooTableName = "hoodie_foo_tbl"
-  var tempBasePath: String = _
-  var commonTableModifier: Map[String, String] = Map()
-  case class StringLongTest(uuid: String, ts: Long)
+class TestHoodieSparkSqlWriter extends HoodieSparkWriterTestBase {
 
   /**
-   * Setup method running before each test.
-   */
-  @BeforeEach
-  def setUp(): Unit = {
-    initSparkContext()
-    tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
-    tempBootStrapPath = 
java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
-    tempBasePath = tempPath.toAbsolutePath.toString
-    commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, 
HoodieTableType.COPY_ON_WRITE.name())
-  }
-
-  /**
-   * Tear down method running after each test.
-   */
-  @AfterEach
-  def tearDown(): Unit = {
-    cleanupSparkContexts()
-    FileUtils.deleteDirectory(tempPath.toFile)
-    FileUtils.deleteDirectory(tempBootStrapPath.toFile)
-  }
-
-  /**
-   * Utility method for initializing the spark context.
-   *
-   * TODO rebase this onto existing base class to avoid duplication
-   */
-  def initSparkContext(): Unit = {
-    val sparkConf = getSparkConfForTest(getClass.getSimpleName)
-
-    spark = SparkSession.builder()
-      .withExtensions(new HoodieSparkSessionExtension)
-      .config(sparkConf)
-      .getOrCreate()
-
-    sc = spark.sparkContext
-    sc.setLogLevel("ERROR")
-    sqlContext = spark.sqlContext
-  }
-
-  /**
-   * Utility method for cleaning up spark resources.
-   */
-  def cleanupSparkContexts(): Unit = {
-    if (sqlContext != null) {
-      sqlContext.clearCache();
-      sqlContext = null;
-    }
-    if (sc != null) {
-      sc.stop()
-      sc = null
-    }
-    if (spark != null) {
-      spark.close()
-    }
-  }
-
-  /**
-   * Utility method for dropping all hoodie meta related columns.
-   */
-  def dropMetaFields(df: Dataset[Row]): Dataset[Row] = {
-    
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
-      
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
-      .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
-  }
-
-  /**
-   * Utility method for creating common params for writer.
-   *
-   * @param path               Path for hoodie table
-   * @param hoodieFooTableName Name of hoodie table
-   * @param tableType          Type of table
-   * @return                   Map of common params
-   */
-  def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, 
tableType: String): Map[String, String] = {
-    Map("path" -> path.toAbsolutePath.toString,
-      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
-      "hoodie.insert.shuffle.parallelism" -> "1",
-      "hoodie.upsert.shuffle.parallelism" -> "1",
-      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
-      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
-      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
-      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.SimpleKeyGenerator")
-  }
-
-  /**
-   * Utility method for converting list of Row to list of Seq.
-   *
-   * @param inputList list of Row
-   * @return list of Seq
-   */
-  def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
-    JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
-
-  /**
-   * Utility method for performing bulk insert  tests.
+   * Local utility method for performing bulk insert  tests.
    *
    * @param sortMode           Bulk insert sort mode
    * @param populateMetaFields Flag for populating meta fields
@@ -226,12 +116,13 @@ class TestHoodieSparkSqlWriter {
     val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty)
     val rhsKey = "hoodie.right.hand.side.key"
     val rhsVal = "hoodie.right.hand.side.val"
-    val modifier = Map(OPERATION.key -> INSERT_OPERATION_OPT_VAL, 
TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal)
+    val modifier = Map(DataSourceWriteOptions.OPERATION.key -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal)
     val modified = HoodieWriterUtils.parametersWithWriteDefaults(modifier)
     val matcher = (k: String, v: String) => modified(k) should be(v)
     originals foreach {
-      case ("hoodie.datasource.write.operation", _) => 
matcher("hoodie.datasource.write.operation", INSERT_OPERATION_OPT_VAL)
-      case ("hoodie.datasource.write.table.type", _) => 
matcher("hoodie.datasource.write.table.type", MOR_TABLE_TYPE_OPT_VAL)
+      case ("hoodie.datasource.write.operation", _) => 
matcher("hoodie.datasource.write.operation", 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      case ("hoodie.datasource.write.table.type", _) => 
matcher("hoodie.datasource.write.table.type", 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
       case (`rhsKey`, _) => matcher(rhsKey, rhsVal)
       case (k, v) => matcher(k, v)
     }
@@ -245,7 +136,7 @@ class TestHoodieSparkSqlWriter {
     spark.stop()
     val session = SparkSession.builder()
       // Here we intentionally remove the "spark.serializer" config to test 
failure
-      .config(getSparkConfForTest("hoodie_test").remove("spark.serializer"))
+      
.config(HoodieClientTestUtils.getSparkConfForTest("hoodie_test").remove("spark.serializer"))
       .getOrCreate()
     try {
       val sqlContext = session.sqlContext
@@ -290,7 +181,7 @@ class TestHoodieSparkSqlWriter {
     
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
 
     //on same path try append with delete operation and 
different("hoodie_bar_tbl") table name which should throw an exception
-    val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> 
"delete")
+    val deleteTableModifier = barTableModifier ++ 
Map(DataSourceWriteOptions.OPERATION.key -> "delete")
     val deleteCmdException = 
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.Append, deleteTableModifier, dataFrame2))
     assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
     
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
@@ -454,7 +345,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
       val fooTableModifier = 
commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
         .updated(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
         .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
-        .updated(INSERT_DROP_DUPS.key, "true")
+        .updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "true")
 
       // generate the inserts
       val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -687,10 +578,11 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
       
.setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
         HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
       .setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
-      .setPayloadClassName(PAYLOAD_CLASS_NAME.key)
-      .setPreCombineField(fooTableParams.getOrElse(PRECOMBINE_FIELD.key, 
PRECOMBINE_FIELD.defaultValue()))
+      .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key)
+      
.setPreCombineField(fooTableParams.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key,
 DataSourceWriteOptions.PRECOMBINE_FIELD.defaultValue()))
       
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
-      
.setKeyGeneratorClassProp(fooTableParams.getOrElse(KEYGENERATOR_CLASS_NAME.key, 
KEYGENERATOR_CLASS_NAME.defaultValue()))
+      
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
+        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
       if(addBootstrapPath) {
         tableMetaClientBuilder
           
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
@@ -1364,19 +1256,19 @@ object TestHoodieSparkSqlWriter {
 
     // NOTE: Hudi doesn't support Orc in Spark < 3.0
     //       Please check HUDI-4496 for more details
-    val targetScenarios = if (gteqSpark3_0) {
+    val targetScenarios = if (HoodieSparkUtils.gteqSpark3_0) {
       parquetScenarios ++ orcScenarios
     } else {
       parquetScenarios
     }
 
-    java.util.Arrays.stream(targetScenarios.map(as => 
arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
+    java.util.Arrays.stream(targetScenarios.map(as => 
Arguments.arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
   }
 
   def deletePartitionsWildcardTestParams(): java.util.stream.Stream[Arguments] 
= {
     java.util.stream.Stream.of(
-      arguments("*5/03/1*", Seq("2016/03/15")),
-      arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
+      Arguments.arguments("*5/03/1*", Seq("2016/03/15")),
+      Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
   }
 
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
index df8614f5e2a..ca4d23f719d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterUtc.scala
@@ -36,7 +36,7 @@ import java.util.TimeZone
  * value of static {@link HoodieInstantTimeGenerator.lastInstantTime} in the 
heap,
  * which will be greater than instant time for {@link 
HoodieTimelineTimeZone.UTC}.
  */
-class TestHoodieSparkSqlWriterUtc extends TestHoodieSparkSqlWriter {
+class TestHoodieSparkSqlWriterUtc extends HoodieSparkWriterTestBase {
   /*
    * Test case for instant is generated with commit timezone when 
TIMELINE_TIMEZONE set to UTC
    * related to HUDI-5978
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
index d9d5b59c8d7..70886d96444 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala
@@ -18,120 +18,24 @@
 package org.apache.hudi
 
 import org.apache.avro.Schema
-import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.model.HoodieMetadataRecord
-import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
-import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.SaveMode
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
+import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
-import scala.collection.JavaConverters
-
 /**
  * Test suite for TableSchemaResolver with SparkSqlWriter.
  */
 @Tag("functional")
-class TestTableSchemaResolverWithSparkSQL {
-  var spark: SparkSession = _
-  var sqlContext: SQLContext = _
-  var sc: SparkContext = _
-  var tempPath: java.nio.file.Path = _
-  var tempBootStrapPath: java.nio.file.Path = _
-  var hoodieFooTableName = "hoodie_foo_tbl"
-  var tempBasePath: String = _
-  var commonTableModifier: Map[String, String] = Map()
-
-  case class StringLongTest(uuid: String, ts: Long)
-
-  /**
-   * Setup method running before each test.
-   */
-  @BeforeEach
-  def setUp(): Unit = {
-    initSparkContext()
-    tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
-    tempBootStrapPath = 
java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
-    tempBasePath = tempPath.toAbsolutePath.toString
-    commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, 
HoodieTableType.COPY_ON_WRITE.name())
-  }
-
-  /**
-   * Tear down method running after each test.
-   */
-  @AfterEach
-  def tearDown(): Unit = {
-    cleanupSparkContexts()
-    FileUtils.deleteDirectory(tempPath.toFile)
-    FileUtils.deleteDirectory(tempBootStrapPath.toFile)
-  }
-
-  /**
-   * Utility method for initializing the spark context.
-   */
-  def initSparkContext(): Unit = {
-    spark = SparkSession.builder()
-      .config(getSparkConfForTest(hoodieFooTableName))
-      .getOrCreate()
-    sc = spark.sparkContext
-    sc.setLogLevel("ERROR")
-    sqlContext = spark.sqlContext
-  }
-
-  /**
-   * Utility method for cleaning up spark resources.
-   */
-  def cleanupSparkContexts(): Unit = {
-    if (sqlContext != null) {
-      sqlContext.clearCache();
-      sqlContext = null;
-    }
-    if (sc != null) {
-      sc.stop()
-      sc = null
-    }
-    if (spark != null) {
-      spark.close()
-    }
-  }
-
-  /**
-   * Utility method for creating common params for writer.
-   *
-   * @param path               Path for hoodie table
-   * @param hoodieFooTableName Name of hoodie table
-   * @param tableType          Type of table
-   * @return Map of common params
-   */
-  def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, 
tableType: String): Map[String, String] = {
-    Map("path" -> path.toAbsolutePath.toString,
-      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
-      "hoodie.insert.shuffle.parallelism" -> "1",
-      "hoodie.upsert.shuffle.parallelism" -> "1",
-      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
-      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
-      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
-      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.SimpleKeyGenerator")
-  }
-
-  /**
-   * Utility method for converting list of Row to list of Seq.
-   *
-   * @param inputList list of Row
-   * @return list of Seq
-   */
-  def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
-    JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
+class TestTableSchemaResolverWithSparkSQL extends HoodieSparkWriterTestBase {
 
   @Test
   def testTableSchemaResolverInMetadataTable(): Unit = {

Reply via email to