This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7777c78e5c9cc7908bbd6ba65315c7beabbc75fd
Author: Yann Byron <[email protected]>
AuthorDate: Sat Aug 6 18:29:34 2022 +0800

    [HUDI-4514] optimize CTAS to adapt to saveAsTable api in different modes 
(#6295)
---
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  | 15 ++++-
 .../command/CreateHoodieTableAsSelectCommand.scala |  9 ++-
 .../apache/hudi/functional/TestCOWDataSource.scala | 70 ++++++++++++++++++++--
 .../apache/spark/sql/hudi/TestCreateTable.scala    |  2 +-
 4 files changed, 83 insertions(+), 13 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index e27c15ebcf..09981e845a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.common.config.DFSPropertiesConfiguration
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.ValidationUtils
+import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.spark.internal.Logging
@@ -91,6 +91,11 @@ class HoodieCatalogTable(val spark: SparkSession, var table: 
CatalogTable) exten
    */
   lazy val tableName: String = tableConfig.getTableName
 
+  /**
+   * the name of dabase
+   */
+  lazy val databaseName: String = tableConfig.getDatabaseName
+
   /**
    * The name of type of table
    */
@@ -171,19 +176,23 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
     val properties = new Properties()
     properties.putAll(tableConfigs.asJava)
 
+    val catalogDatabaseName = formatName(spark,
+      
table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
     if (hoodieTableExists) {
+      assert(StringUtils.isNullOrEmpty(databaseName) || databaseName == 
catalogDatabaseName,
+        "The database names from this hoodie path and this catalog table is 
not same.")
       // just persist hoodie.table.create.schema
       HoodieTableMetaClient.withPropertyBuilder()
         .fromProperties(properties)
+        .setDatabaseName(catalogDatabaseName)
         
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
         .initTable(hadoopConf, tableLocation)
     } else {
       val (recordName, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
       val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, 
namespace)
-      val hoodieDatabaseName = formatName(spark, 
table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
       HoodieTableMetaClient.withPropertyBuilder()
         .fromProperties(properties)
-        .setDatabaseName(hoodieDatabaseName)
+        .setDatabaseName(catalogDatabaseName)
         .setTableName(table.identifier.table)
         .setTableCreateSchema(schema.toString())
         .setPartitionFields(table.partitionColumnNames.mkString(","))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index b43d3a3f85..6a93f0c7af 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -29,7 +29,6 @@ import 
org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, HoodieCatalogTable}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
 
 import scala.collection.JavaConverters._
@@ -62,8 +61,9 @@ case class CreateHoodieTableAsSelectCommand(
         s"Expect the table $tableName has been dropped when the save mode is 
Overwrite")
 
       if (mode == SaveMode.ErrorIfExists) {
-        throw new RuntimeException(s"Table $tableName already exists. You need 
to drop it first.")
+        throw new AnalysisException(s"Table $tableName already exists. You 
need to drop it first.")
       }
+
       if (mode == SaveMode.Ignore) {
         // Since the table already exists and the save mode is Ignore, we will 
just return.
         // scalastyle:off
@@ -92,8 +92,6 @@ case class CreateHoodieTableAsSelectCommand(
     val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
     val tablePath = hoodieCatalogTable.tableLocation
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
-    assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf),
-      s"Path '$tablePath' should be empty for CTAS")
 
     // Execute the insert query
     try {
@@ -108,7 +106,8 @@ case class CreateHoodieTableAsSelectCommand(
         DataSourceWriteOptions.SQL_INSERT_MODE.key -> 
InsertMode.NON_STRICT.value(),
         DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
       )
-      val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, 
reOrderedQuery, Map.empty,
+      val partitionSpec = newTable.partitionColumnNames.map((_, None)).toMap
+      val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, 
reOrderedQuery, partitionSpec,
         mode == SaveMode.Overwrite, refreshTable = false, extraOptions = 
options)
       if (success) {
         // If write success, create the table in catalog if it has not synced 
to the
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 631b5d0894..c7eef5bce4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -33,7 +33,7 @@ import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.util.JFunction
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, concat, lit, udf}
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -978,9 +978,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     df.write.format("hudi")
       .options(commonOpts)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
       .option("hoodie.insert.shuffle.parallelism", "4")
       .option("hoodie.upsert.shuffle.parallelism", "4")
       .option("hoodie.bulkinsert.shuffle.parallelism", "2")
@@ -1045,4 +1042,69 @@ class TestCOWDataSource extends HoodieClientTestBase {
       )
     }
   }
+
+  @Test
+  def testSaveAsTableInDifferentModes(): Unit = {
+    val options = scala.collection.mutable.Map.empty ++ commonOpts ++ 
Map("path" -> basePath)
+
+    // first use the Overwrite mode
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .partitionBy("partition")
+      .options(options)
+      .mode(SaveMode.Append)
+      .saveAsTable("hoodie_test")
+
+    // init metaClient
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(spark.sessionState.newHadoopConf)
+      .build()
+    assertEquals(spark.read.format("hudi").load(basePath).count(), 5)
+
+    // use the Append mode
+    val records2 = recordsToStrings(dataGen.generateInserts("002", 6)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .partitionBy("partition")
+      .options(options)
+      .mode(SaveMode.Append)
+      .saveAsTable("hoodie_test")
+    assertEquals(spark.read.format("hudi").load(basePath).count(), 11)
+
+    // use the Ignore mode
+    val records3 = recordsToStrings(dataGen.generateInserts("003", 7)).toList
+    val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("org.apache.hudi")
+      .partitionBy("partition")
+      .options(options)
+      .mode(SaveMode.Ignore)
+      .saveAsTable("hoodie_test")
+    // nothing to do for the ignore mode
+    assertEquals(spark.read.format("hudi").load(basePath).count(), 11)
+
+    // use the ErrorIfExists mode
+    val records4 = recordsToStrings(dataGen.generateInserts("004", 8)).toList
+    val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2))
+    try {
+      inputDF4.write.format("org.apache.hudi")
+        .partitionBy("partition")
+        .options(options)
+        .mode(SaveMode.ErrorIfExists)
+        .saveAsTable("hoodie_test")
+    } catch {
+      case e: Throwable => // do nothing
+    }
+
+    // use the Overwrite mode
+    val records5 = recordsToStrings(dataGen.generateInserts("005", 9)).toList
+    val inputDF5 = spark.read.json(spark.sparkContext.parallelize(records5, 2))
+    inputDF5.write.format("org.apache.hudi")
+      .partitionBy("partition")
+      .options(options)
+      .mode(SaveMode.Overwrite)
+      .saveAsTable("hoodie_test")
+    assertEquals(spark.read.format("hudi").load(basePath).count(), 9)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 70848529dc..d3dbf9a6e6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -670,7 +670,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
         
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
         assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
         assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
-        assertResult("")(metaClient.getTableConfig.getDatabaseName)
+        
assertResult("hudi_database")(metaClient.getTableConfig.getDatabaseName)
         
assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName)
 
         // Test insert into

Reply via email to