This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d3b0ee7ee616c2c4529ad4763bbe0ea304498bc1 Author: Yann Byron <[email protected]> AuthorDate: Thu Aug 4 11:05:24 2022 +0800 [HUDI-4487] support to create ro/rt table by spark sql (#6262) --- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 6 +- .../hudi/command/CreateHoodieTableCommand.scala | 33 ++++-- .../command/CreateHoodieTableAsSelectCommand.scala | 9 +- .../apache/spark/sql/hudi/TestCreateTable.scala | 132 +++++++++++++++++++++ 4 files changed, 169 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 3dbb358fbb..e27c15ebcf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -39,10 +39,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable /** - * Table definition for SQL funcitonalities. Depending on the way of data generation, + * Table definition for SQL functionalities. Depending on the way of data generation, * meta of Hudi table can be from Spark catalog or meta directory on filesystem. * [[HoodieCatalogTable]] takes both meta sources into consideration when handling * EXTERNAL and MANAGED tables. + * + * NOTE: all the meta should be retrieved from meta directory on filesystem first. */ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging { @@ -53,7 +55,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten /** * database.table in catalog */ - val catalogTableName = table.qualifiedName + val catalogTableName: String = table.qualifiedName /** * properties defined in catalog. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 75803fd779..d8907084e9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -18,12 +18,15 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.fs.Path -import org.apache.hudi.common.model.HoodieFileFormat + +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.hadoop.HoodieParquetInputFormat import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils +import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} + import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps @@ -33,7 +36,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkConf} import scala.collection.JavaConverters._ @@ -62,12 +65,22 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean // check if there are conflict between table configs defined in hoodie table and properties defined in catalog. CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable) - // init hoodie table - hoodieCatalogTable.initHoodieTable() + val queryAsProp = hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) + if (queryAsProp.isEmpty) { + // init hoodie table for a normal table (not a ro/rt table) + hoodieCatalogTable.initHoodieTable() + } else { + if (!hoodieCatalogTable.hoodieTableExists) { + throw new AnalysisException("Creating ro/rt table need the existence of the base table.") + } + if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) { + throw new AnalysisException("Creating ro/rt table should only apply to a mor table.") + } + } try { // create catalog table for this hoodie table - CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists) + CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists, queryAsProp) } catch { case NonFatal(e) => logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}") @@ -92,8 +105,11 @@ object CreateHoodieTableCommand { } } - def createTableInCatalog(sparkSession: SparkSession, - hoodieCatalogTable: HoodieCatalogTable, ignoreIfExists: Boolean): Unit = { + def createTableInCatalog( + sparkSession: SparkSession, + hoodieCatalogTable: HoodieCatalogTable, + ignoreIfExists: Boolean, + queryAsProp: Option[String] = None): Unit = { val table = hoodieCatalogTable.table assert(table.tableType != CatalogTableType.VIEW) @@ -121,7 +137,8 @@ object CreateHoodieTableCommand { Some(outputFormat), Some(serdeFormat), table.storage.compressed, - storageProperties + ("path" -> path)) + storageProperties + ("path" -> path) ++ queryAsProp.map(ConfigUtils.IS_QUERY_AS_RO_TABLE -> _) + ) val tableName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table) val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 0c8714b7b9..b43d3a3f85 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.util.ConfigUtils + import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils -import org.apache.spark.sql.{Row, SaveMode, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import scala.collection.JavaConverters._ @@ -45,6 +47,11 @@ case class CreateHoodieTableAsSelectCommand( assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) + val hasQueryAsProp = (table.storage.properties ++ table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE) + if (hasQueryAsProp) { + throw new AnalysisException("Not support CTAS for the ro/rt table") + } + val sessionState = sparkSession.sessionState val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = table.identifier.copy(database = Some(db)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 7091de4a8e..70848529dc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -382,6 +382,138 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } } + test("Test Create ro/rt Table In The Right Way") { + withTempDir { tmp => + val parentPath = tmp.getCanonicalPath + val tableName1 = generateTableName + spark.sql( + s""" + |create table $tableName1 ( + | id int, + | name string, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'mor' + | ) + | location '$parentPath/$tableName1' + """.stripMargin) + spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)") + spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)") + + // drop ro and rt table, and recreate them + val roTableName1 = tableName1 + "_ro" + val rtTableName1 = tableName1 + "_rt" + spark.sql( + s""" + |create table $roTableName1 + |using hudi + |tblproperties ( + | 'hoodie.query.as.ro.table' = 'true' + |) + |location '$parentPath/$tableName1' + |""".stripMargin + ) + val roCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(roTableName1)) + assertResult(roCatalogTable.properties("type"))("mor") + assertResult(roCatalogTable.properties("primaryKey"))("id") + assertResult(roCatalogTable.properties("preCombineField"))("ts") + assertResult(roCatalogTable.storage.properties("hoodie.query.as.ro.table"))("true") + checkAnswer(s"select id, name, ts from $roTableName1")( + Seq(1, "a1", 1000) + ) + + spark.sql( + s""" + |create table $rtTableName1 + |using hudi + |tblproperties ( + | 'hoodie.query.as.ro.table' = 'false' + |) + |location '$parentPath/$tableName1' + |""".stripMargin + ) + val rtCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(rtTableName1)) + assertResult(rtCatalogTable.properties("type"))("mor") + assertResult(rtCatalogTable.properties("primaryKey"))("id") + assertResult(rtCatalogTable.properties("preCombineField"))("ts") + assertResult(rtCatalogTable.storage.properties("hoodie.query.as.ro.table"))("false") + checkAnswer(s"select id, name, ts from $rtTableName1")( + Seq(1, "a2", 1100) + ) + } + } + + test("Test Create ro/rt Table In The Wrong Way") { + withTempDir { tmp => + val parentPath = tmp.getCanonicalPath + + // test the case that create rt/rt table on cow table + val tableName1 = generateTableName + spark.sql( + s""" + |create table $tableName1 ( + | id int, + | name string, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'cow' + | ) + | location '$parentPath/$tableName1' + """.stripMargin) + spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)") + spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)") + + val roTableName1 = tableName1 + "_ro" + checkExceptionContain( + s""" + |create table $roTableName1 + |using hudi + |tblproperties ( + | 'hoodie.query.as.ro.table' = 'true' + |) + |location '$parentPath/$tableName1' + |""".stripMargin + )("Creating ro/rt table should only apply to a mor table.") + + // test the case that create rt/rt table on a nonexistent table + val tableName2 = generateTableName + val rtTableName2 = tableName2 + "_rt" + checkExceptionContain( + s""" + |create table $rtTableName2 + |using hudi + |tblproperties ( + | 'hoodie.query.as.ro.table' = 'true' + |) + |location '$parentPath/$tableName2' + |""".stripMargin + )("Creating ro/rt table need the existence of the base table.") + + // test the case that CTAS + val tableName3 = generateTableName + checkExceptionContain( + s""" + | create table $tableName3 using hudi + | tblproperties( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'mor', + | 'hoodie.query.as.ro.table' = 'true' + | ) + | location '$parentPath/$tableName3' + | AS + | select 1 as id, 'a1' as name, 1000 as ts + | """.stripMargin + )("Not support CTAS for the ro/rt table") + } + } + test("Test Create Table As Select With Tblproperties For Filter Props") { Seq("cow", "mor").foreach { tableType => val tableName = generateTableName
