This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 75f847691f [HUDI-4001] Filter the properties should not be used when 
create table for Spark SQL (#5495)
75f847691f is described below

commit 75f847691f0bdaf226d4713a8cb8c7639cffd5e5
Author: 董可伦 <[email protected]>
AuthorDate: Mon May 16 09:50:29 2022 +0800

    [HUDI-4001] Filter the properties should not be used when create table for 
Spark SQL (#5495)
---
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |   3 +
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   3 +-
 .../hudi/command/CreateHoodieTableCommand.scala    |   6 +-
 .../command/CreateHoodieTableAsSelectCommand.scala |  23 ++++-
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 103 ++++++++++++++++++++-
 5 files changed, 127 insertions(+), 11 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 7ee8f6ad56..76cea362a3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.catalog
 
 import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.DataSourceWriteOptions.OPERATION
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.common.config.DFSPropertiesConfiguration
 import org.apache.hudi.common.model.HoodieTableType
@@ -321,6 +322,8 @@ class HoodieCatalogTable(val spark: SparkSession, val 
table: CatalogTable) exten
 }
 
 object HoodieCatalogTable {
+  // The properties should not be used when create table
+  val needFilterProps: List[String] = 
List(HoodieTableConfig.DATABASE_NAME.key, HoodieTableConfig.NAME.key, 
OPERATION.key)
 
   def apply(sparkSession: SparkSession, tableIdentifier: TableIdentifier): 
HoodieCatalogTable = {
     val catalogTable = 
sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 31fb0ad6cb..131ebebe85 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -255,8 +255,7 @@ trait ProvidesHoodieConfig extends Logging {
     val hoodieProps = getHoodieProps(catalogProperties, tableConfig, 
sparkSession.sqlContext.conf)
     val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
 
-    // operation can not be overwrite
-    val options = hoodieCatalogTable.catalogProperties.-(OPERATION.key())
+    val options = hoodieCatalogTable.catalogProperties
 
     withSparkConf(sparkSession, options) {
       Map(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 195bf4153c..9bf1d72152 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
 import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
 import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
 import org.apache.spark.sql.hive.HiveClientUtils
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive
@@ -130,8 +131,9 @@ object CreateHoodieTableCommand {
       .copy(table = tableName, database = Some(newDatabaseName))
 
     val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name)
-    // append pk, preCombineKey, type to the properties of table
-    val newTblProperties = hoodieCatalogTable.catalogProperties ++ 
HoodieOptionConfig.extractSqlOptions(properties)
+    // Remove some properties should not be used;append pk, preCombineKey, 
type to the properties of table
+    val newTblProperties =
+      hoodieCatalogTable.catalogProperties.--(needFilterProps) ++ 
HoodieOptionConfig.extractSqlOptions(properties)
     val newTable = table.copy(
       identifier = newTableIdentifier,
       storage = newStorage,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index 1d2cea10af..66aeb850e4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -23,7 +23,8 @@ import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.hive.HiveSyncConfig
 import org.apache.hudi.hive.util.ConfigUtils
 import org.apache.hudi.sql.InsertMode
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HoodieCatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, HoodieCatalogTable}
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -66,9 +67,21 @@ case class CreateHoodieTableAsSelectCommand(
 
     // ReOrder the query which move the partition columns to the last of the 
project list
     val reOrderedQuery = reOrderPartitionColumn(query, 
table.partitionColumnNames)
-    val tableWithSchema = table.copy(schema = reOrderedQuery.schema)
+    // Remove some properties should not be used
+    val newStorage = new CatalogStorageFormat(
+      table.storage.locationUri,
+      table.storage.inputFormat,
+      table.storage.outputFormat,
+      table.storage.serde,
+      table.storage.compressed,
+      table.storage.properties.--(needFilterProps))
+    val newTable = table.copy(
+      storage = newStorage,
+      schema = reOrderedQuery.schema,
+      properties = table.properties.--(needFilterProps)
+    )
 
-    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableWithSchema)
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
     val tablePath = hoodieCatalogTable.tableLocation
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf),
@@ -83,11 +96,11 @@ case class CreateHoodieTableAsSelectCommand(
       val options = Map(
         HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == 
CatalogTableType.MANAGED).toString,
         HiveSyncConfig.HIVE_TABLE_SERDE_PROPERTIES.key -> 
ConfigUtils.configToString(tblProperties.asJava),
-        HiveSyncConfig.HIVE_TABLE_PROPERTIES.key -> 
ConfigUtils.configToString(table.properties.asJava),
+        HiveSyncConfig.HIVE_TABLE_PROPERTIES.key -> 
ConfigUtils.configToString(newTable.properties.asJava),
         DataSourceWriteOptions.SQL_INSERT_MODE.key -> 
InsertMode.NON_STRICT.value(),
         DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
       )
-      val success = InsertIntoHoodieTableCommand.run(sparkSession, 
tableWithSchema, reOrderedQuery, Map.empty,
+      val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, 
reOrderedQuery, Map.empty,
         mode == SaveMode.Overwrite, refreshTable = false, extraOptions = 
options)
       if (success) {
         // If write success, create the table in catalog if it has not synced 
to the
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 69147272da..5435aad05e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.types._
 
+import org.junit.jupiter.api.Assertions.assertFalse
+
 import scala.collection.JavaConverters._
 
 class TestCreateTable extends HoodieSparkSqlTestBase {
@@ -49,8 +51,11 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
          |  ts long
          | ) using hudi
          | tblproperties (
+         |   hoodie.database.name = "databaseName",
+         |   hoodie.table.name = "tableName",
          |   primaryKey = 'id',
-         |   preCombineField = 'ts'
+         |   preCombineField = 'ts',
+         |   hoodie.datasource.write.operation = 'upsert'
          | )
        """.stripMargin)
     val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
@@ -65,6 +70,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
         StructField("price", DoubleType),
         StructField("ts", LongType))
     )(table.schema.fields)
+    
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
+    assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
+    assertFalse(table.properties.contains(OPERATION.key()))
 
     val tablePath = table.storage.properties("path")
     val metaClient = HoodieTableMetaClient.builder()
@@ -73,6 +81,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
       .build()
     val tableConfig = metaClient.getTableConfig
     assertResult(databaseName)(tableConfig.getDatabaseName)
+    assertResult(tableName)(tableConfig.getTableName)
+    assertFalse(tableConfig.contains(OPERATION.key()))
+
+    spark.sql("use default")
   }
 
   test("Test Create Hoodie Table With Options") {
@@ -88,8 +100,11 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
          | ) using hudi
          | partitioned by (dt)
          | options (
+         |   hoodie.database.name = "databaseName",
+         |   hoodie.table.name = "tableName",
          |   primaryKey = 'id',
-         |   preCombineField = 'ts'
+         |   preCombineField = 'ts',
+         |   hoodie.datasource.write.operation = 'upsert'
          | )
        """.stripMargin)
     val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
@@ -108,6 +123,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
         StructField("ts", LongType),
         StructField("dt", StringType))
     )(table.schema.fields)
+    
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
+    assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
+    assertFalse(table.properties.contains(OPERATION.key()))
 
     val tablePath = table.storage.properties("path")
     val metaClient = HoodieTableMetaClient.builder()
@@ -120,6 +138,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key))
     assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key))
     
assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
+    assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
+    assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
+    assertFalse(tableConfig.contains(OPERATION.key()))
   }
 
   test("Test Create External Hoodie Table") {
@@ -361,6 +382,84 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Create Table As Select With Tblproperties For Filter Props") {
+    Seq("cow", "mor").foreach { tableType =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           | create table $tableName using hudi
+           | partitioned by (dt)
+           | tblproperties(
+           |    hoodie.database.name = "databaseName",
+           |    hoodie.table.name = "tableName",
+           |    primaryKey = 'id',
+           |    preCombineField = 'ts',
+           |    hoodie.datasource.write.operation = 'upsert',
+           |    type = '$tableType'
+           | )
+           | AS
+           | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 
1000 as ts
+         """.stripMargin
+      )
+      checkAnswer(s"select id, name, price, dt from $tableName")(
+        Seq(1, "a1", 10, "2021-04-01")
+      )
+      val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+      
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
+      assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
+      assertFalse(table.properties.contains(OPERATION.key()))
+
+      val tablePath = table.storage.properties("path")
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(tablePath)
+        .setConf(spark.sessionState.newHadoopConf())
+        .build()
+      val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap
+      
assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
+      assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
+      assertFalse(tableConfig.contains(OPERATION.key()))
+    }
+  }
+
+  test("Test Create Table As Select With Options For Filter Props") {
+    Seq("cow", "mor").foreach { tableType =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           | create table $tableName using hudi
+           | partitioned by (dt)
+           | options(
+           |    hoodie.database.name = "databaseName",
+           |    hoodie.table.name = "tableName",
+           |    primaryKey = 'id',
+           |    preCombineField = 'ts',
+           |    hoodie.datasource.write.operation = 'upsert',
+           |    type = '$tableType'
+           | )
+           | AS
+           | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 
1000 as ts
+         """.stripMargin
+      )
+      checkAnswer(s"select id, name, price, dt from $tableName")(
+        Seq(1, "a1", 10, "2021-04-01")
+      )
+      val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+      
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
+      assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
+      assertFalse(table.properties.contains(OPERATION.key()))
+
+      val tablePath = table.storage.properties("path")
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(tablePath)
+        .setConf(spark.sessionState.newHadoopConf())
+        .build()
+      val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap
+      
assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
+      assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
+      assertFalse(tableConfig.contains(OPERATION.key()))
+    }
+  }
+
   test("Test Create Table As Select when 'spark.sql.datetime.java8API.enabled' 
enables") {
     try {
       // enable spark.sql.datetime.java8API.enabled

Reply via email to