This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d3b0ee7ee616c2c4529ad4763bbe0ea304498bc1
Author: Yann Byron <[email protected]>
AuthorDate: Thu Aug 4 11:05:24 2022 +0800

    [HUDI-4487] support to create ro/rt table by spark sql (#6262)
---
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |   6 +-
 .../hudi/command/CreateHoodieTableCommand.scala    |  33 ++++--
 .../command/CreateHoodieTableAsSelectCommand.scala |   9 +-
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 132 +++++++++++++++++++++
 4 files changed, 169 insertions(+), 11 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 3dbb358fbb..e27c15ebcf 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -39,10 +39,12 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
- * Table definition for SQL funcitonalities. Depending on the way of data 
generation,
+ * Table definition for SQL functionalities. Depending on the way of data 
generation,
  * meta of Hudi table can be from Spark catalog or meta directory on 
filesystem.
  * [[HoodieCatalogTable]] takes both meta sources into consideration when 
handling
  * EXTERNAL and MANAGED tables.
+ *
+ * NOTE: all the meta should be retrieved from meta directory on filesystem 
first.
  */
 class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) 
extends Logging {
 
@@ -53,7 +55,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: 
CatalogTable) exten
   /**
    * database.table in catalog
    */
-  val catalogTableName = table.qualifiedName
+  val catalogTableName: String = table.qualifiedName
 
   /**
    * properties defined in catalog.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 75803fd779..d8907084e9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -18,12 +18,15 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.model.HoodieFileFormat
+
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.hadoop.HoodieParquetInputFormat
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
+import org.apache.hudi.sync.common.util.ConfigUtils
 import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
+
 import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
@@ -33,7 +36,7 @@ import 
org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
 import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
 import 
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.{SPARK_VERSION, SparkConf}
 
 import scala.collection.JavaConverters._
@@ -62,12 +65,22 @@ case class CreateHoodieTableCommand(table: CatalogTable, 
ignoreIfExists: Boolean
     // check if there are conflict between table configs defined in hoodie 
table and properties defined in catalog.
     CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
 
-    // init hoodie table
-    hoodieCatalogTable.initHoodieTable()
+    val queryAsProp = 
hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
+    if (queryAsProp.isEmpty) {
+      // init hoodie table for a normal table (not a ro/rt table)
+      hoodieCatalogTable.initHoodieTable()
+    } else {
+      if (!hoodieCatalogTable.hoodieTableExists) {
+        throw new AnalysisException("Creating ro/rt table need the existence 
of the base table.")
+      }
+      if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) {
+        throw new AnalysisException("Creating ro/rt table should only apply to 
a mor table.")
+      }
+    }
 
     try {
       // create catalog table for this hoodie table
-      CreateHoodieTableCommand.createTableInCatalog(sparkSession, 
hoodieCatalogTable, ignoreIfExists)
+      CreateHoodieTableCommand.createTableInCatalog(sparkSession, 
hoodieCatalogTable, ignoreIfExists, queryAsProp)
     } catch {
       case NonFatal(e) =>
         logWarning(s"Failed to create catalog table in metastore: 
${e.getMessage}")
@@ -92,8 +105,11 @@ object CreateHoodieTableCommand {
     }
   }
 
-  def createTableInCatalog(sparkSession: SparkSession,
-      hoodieCatalogTable: HoodieCatalogTable, ignoreIfExists: Boolean): Unit = 
{
+  def createTableInCatalog(
+      sparkSession: SparkSession,
+      hoodieCatalogTable: HoodieCatalogTable,
+      ignoreIfExists: Boolean,
+      queryAsProp: Option[String] = None): Unit = {
     val table = hoodieCatalogTable.table
     assert(table.tableType != CatalogTableType.VIEW)
 
@@ -121,7 +137,8 @@ object CreateHoodieTableCommand {
       Some(outputFormat),
       Some(serdeFormat),
       table.storage.compressed,
-      storageProperties + ("path" -> path))
+      storageProperties + ("path" -> path) ++ 
queryAsProp.map(ConfigUtils.IS_QUERY_AS_RO_TABLE -> _)
+    )
 
     val tableName = HoodieSqlCommonUtils.formatName(sparkSession, 
table.identifier.table)
     val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, 
table.identifier.database
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index 0c8714b7b9..b43d3a3f85 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.util.ConfigUtils
+
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, HoodieCatalogTable}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
-import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
 
 import scala.collection.JavaConverters._
 
@@ -45,6 +47,11 @@ case class CreateHoodieTableAsSelectCommand(
     assert(table.tableType != CatalogTableType.VIEW)
     assert(table.provider.isDefined)
 
+    val hasQueryAsProp = (table.storage.properties ++ 
table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE)
+    if (hasQueryAsProp) {
+      throw new AnalysisException("Not support CTAS for the ro/rt table")
+    }
+
     val sessionState = sparkSession.sessionState
     val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = table.identifier.copy(database = Some(db))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 7091de4a8e..70848529dc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -382,6 +382,138 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Create ro/rt Table In The Right Way") {
+    withTempDir { tmp =>
+      val parentPath = tmp.getCanonicalPath
+      val tableName1 = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName1 (
+           |  id int,
+           |  name string,
+           |  ts long
+           |) using hudi
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  type = 'mor'
+           | )
+           | location '$parentPath/$tableName1'
+       """.stripMargin)
+      spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
+      spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
+
+      // drop ro and rt table, and recreate them
+      val roTableName1 = tableName1 + "_ro"
+      val rtTableName1 = tableName1 + "_rt"
+      spark.sql(
+        s"""
+           |create table $roTableName1
+           |using hudi
+           |tblproperties (
+           | 'hoodie.query.as.ro.table' = 'true'
+           |)
+           |location '$parentPath/$tableName1'
+           |""".stripMargin
+      )
+      val roCatalogTable = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(roTableName1))
+      assertResult(roCatalogTable.properties("type"))("mor")
+      assertResult(roCatalogTable.properties("primaryKey"))("id")
+      assertResult(roCatalogTable.properties("preCombineField"))("ts")
+      
assertResult(roCatalogTable.storage.properties("hoodie.query.as.ro.table"))("true")
+      checkAnswer(s"select id, name, ts from $roTableName1")(
+        Seq(1, "a1", 1000)
+      )
+
+      spark.sql(
+        s"""
+           |create table $rtTableName1
+           |using hudi
+           |tblproperties (
+           | 'hoodie.query.as.ro.table' = 'false'
+           |)
+           |location '$parentPath/$tableName1'
+           |""".stripMargin
+      )
+      val rtCatalogTable = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(rtTableName1))
+      assertResult(rtCatalogTable.properties("type"))("mor")
+      assertResult(rtCatalogTable.properties("primaryKey"))("id")
+      assertResult(rtCatalogTable.properties("preCombineField"))("ts")
+      
assertResult(rtCatalogTable.storage.properties("hoodie.query.as.ro.table"))("false")
+      checkAnswer(s"select id, name, ts from $rtTableName1")(
+        Seq(1, "a2", 1100)
+      )
+    }
+  }
+
+  test("Test Create ro/rt Table In The Wrong Way") {
+    withTempDir { tmp =>
+      val parentPath = tmp.getCanonicalPath
+
+      // test the case that create rt/rt table on cow table
+      val tableName1 = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName1 (
+           |  id int,
+           |  name string,
+           |  ts long
+           |) using hudi
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  type = 'cow'
+           | )
+           | location '$parentPath/$tableName1'
+     """.stripMargin)
+      spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
+      spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
+
+      val roTableName1 = tableName1 + "_ro"
+      checkExceptionContain(
+        s"""
+           |create table $roTableName1
+           |using hudi
+           |tblproperties (
+           | 'hoodie.query.as.ro.table' = 'true'
+           |)
+           |location '$parentPath/$tableName1'
+           |""".stripMargin
+      )("Creating ro/rt table should only apply to a mor table.")
+
+      // test the case that create rt/rt table on a nonexistent table
+      val tableName2 = generateTableName
+      val rtTableName2 = tableName2 + "_rt"
+      checkExceptionContain(
+        s"""
+           |create table $rtTableName2
+           |using hudi
+           |tblproperties (
+           | 'hoodie.query.as.ro.table' = 'true'
+           |)
+           |location '$parentPath/$tableName2'
+           |""".stripMargin
+      )("Creating ro/rt table need the existence of the base table.")
+
+      // test the case that CTAS
+      val tableName3 = generateTableName
+      checkExceptionContain(
+        s"""
+           | create table $tableName3 using hudi
+           | tblproperties(
+           |    primaryKey = 'id',
+           |    preCombineField = 'ts',
+           |    type = 'mor',
+           |    'hoodie.query.as.ro.table' = 'true'
+           | )
+           | location '$parentPath/$tableName3'
+           | AS
+           | select 1 as id, 'a1' as name, 1000 as ts
+           | """.stripMargin
+      )("Not support CTAS for the ro/rt table")
+    }
+  }
+
   test("Test Create Table As Select With Tblproperties For Filter Props") {
     Seq("cow", "mor").foreach { tableType =>
       val tableName = generateTableName

Reply via email to