This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4bd7cf428bdcfababab67e2458b62cf20133454c Author: XuQianJin-Stars <[email protected]> AuthorDate: Mon Jan 16 19:47:30 2023 +0800 [HUDI-5548] spark sql update hudi's table properties --- .../command/ShowHoodieTablePropertiesCommand.scala | 61 ++++++++++++++++++++++ .../spark/sql/hudi/analysis/HoodieAnalysis.scala | 1 + .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 9 ++++ .../hudi/command/Spark31AlterTableCommand.scala | 31 ++++++++++- .../sql/hudi/analysis/HoodieSpark3Analysis.scala | 6 ++- .../spark/sql/hudi/command/AlterTableCommand.scala | 35 ++++++++++++- 6 files changed, 138 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala new file mode 100644 index 00000000000..8faa3354bea --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.SparkAdapterSupport.sparkAdapter +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.{Row, SparkSession} + +import scala.collection.JavaConversions.mapAsScalaMap + +/** + * Command for show hudi table's properties. + */ +case class ShowHoodieTablePropertiesCommand( + tableIdentifier: TableIdentifier, + propertyKey: Option[String]) + extends HoodieLeafRunnableCommand { + + override val output: Seq[Attribute] = { + val schema = AttributeReference("value", StringType, nullable = false)() :: Nil + propertyKey match { + case None => AttributeReference("key", StringType, nullable = false)() :: schema + case _ => schema + } + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + if (!sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) { + Seq.empty[Row] + } else { + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) + val tableProps = hoodieCatalogTable.metaClient.getTableConfig.getProps + propertyKey match { + case Some(p) => + val propValue = tableProps + .getOrElse(p, s"Table ${tableIdentifier.unquotedString} does not have property: $p") + Seq(Row(propValue)) + case None => + tableProps.map(p => Row(p._1, p._2)).toSeq + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index c8add030981..e42bddbd102 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -610,6 +610,7 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case TruncateTableCommand(tableName, partitionSpec) if sparkAdapter.isHoodieTable(tableName, sparkSession) => TruncateHoodieTableCommand(tableName, partitionSpec) + case s: ShowTablePropertiesCommand => ShowHoodieTablePropertiesCommand(s.table, s.propertyKey) case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 65357b903b5..285fcc3f322 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -22,7 +22,10 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.sync.common.HoodieMetaSyncOperations import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.hudi.sync.common.HoodieMetaSyncOperations +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{arrays_zip, col} import org.apache.spark.sql.{Row, SaveMode, SparkSession} @@ -188,6 +191,12 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val meta = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) assert(meta.comment.get.equals("it is a hudi table")) assert(Seq("key1", "key2").filter(meta.properties.contains(_)).size == 2) + + // test show properties + assertResult(tableName) { + spark.sql(s"SHOW TBLPROPERTIES $tableName ('hoodie.table.name')").collect().apply(0).get(0) + } + // test unset propertes spark.sql(s"alter table $tableName unset tblproperties(comment, 'key1', 'key2')") val unsetMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala index 529b5bb49ec..debcf0aadb8 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala @@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType} import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceUtils} import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} import org.apache.hudi.common.table.timeline.HoodieInstant.State -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{CommitUtils, Option} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.internal.schema.InternalSchema @@ -48,6 +48,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColu import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types.StructType +import java.util.Properties import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -169,6 +170,12 @@ case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChang val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } val newTable = table.copy(properties = newProperties, comment = tableComment) catalog.alterTable(newTable) + + // delete hoodie table's config file + val deleteProps: util.Set[String] = new util.HashSet[String]() + propKeys.foreach(v => deleteProps.add(v)) + Spark31AlterTableCommand.deleteTableProperties(sparkSession, deleteProps, table) + logInfo("table properties change finished") } @@ -183,6 +190,12 @@ case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChang properties = table.properties ++ properties, comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment)) catalog.alterTable(newTable) + + // upserts the hoodie table's config file + val updatedProps = new Properties + properties.foreach(u => updatedProps.setProperty(u._1, u._2)) + Spark31AlterTableCommand.updateTableProperties(sparkSession, updatedProps, table) + logInfo("table properties change finished") } @@ -320,5 +333,21 @@ object Spark31AlterTableCommand extends Logging { } } } + + def updateTableProperties(sparkSession: SparkSession, updatedProps: Properties, table: CatalogTable): Any = { + val path = Spark31AlterTableCommand.getTableLocation(table, sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(hadoopConf).build() + HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps) + } + + def deleteTableProperties(sparkSession: SparkSession, deletedProps: util.Set[String], table: CatalogTable): Any = { + val path = Spark31AlterTableCommand.getTableLocation(table, sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(hadoopConf).build() + HoodieTableConfig.delete(metaClient.getFs, new Path(metaClient.getMetaPath), deletedProps) + } } diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala index 1f11caedb8c..ab6b9ebbbd5 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala @@ -25,12 +25,14 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper import org.apache.spark.sql.connector.catalog.{Table, V1Table} +import org.apache.spark.sql.execution.command.ShowTablePropertiesCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, removeMetaFields} import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table -import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand} +import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, ShowHoodieTablePropertiesCommand, TruncateHoodieTableCommand} +import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{SQLContext, SparkSession} @@ -150,7 +152,7 @@ case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule purge, retainData = true ) - + case s: ShowTablePropertiesCommand => ShowHoodieTablePropertiesCommand(s.table, s.propertyKey) case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala index bca3e7050c7..8095a26fc42 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala @@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType} import org.apache.hudi.{DataSourceOptionsHelper, DataSourceUtils} import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} import org.apache.hudi.common.table.timeline.HoodieInstant.State -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{CommitUtils, Option} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.internal.schema.InternalSchema @@ -45,8 +45,11 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange} import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RemoveProperty, SetProperty} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.hudi.command.AlterTableCommand.getTableLocation import org.apache.spark.sql.types.StructType +import java.util.Properties import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -185,13 +188,19 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha // ignore NonExist unset propKeys.foreach { k => if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) { - logWarning(s"find non exist unset property: ${k} , ignore it") + logWarning(s"find non exist unset property: $k , ignore it") } } val tableComment = if (propKeys.contains(TableCatalog.PROP_COMMENT)) None else table.comment val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } val newTable = table.copy(properties = newProperties, comment = tableComment) catalog.alterTable(newTable) + + // delete hoodie table's config file + val deleteProps: util.Set[String] = new util.HashSet[String]() + propKeys.foreach(v => deleteProps.add(v)) + AlterTableCommand.deleteTableProperties(sparkSession, deleteProps, table) + logInfo("table properties change finished") } @@ -206,6 +215,12 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha properties = table.properties ++ properties, comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment)) catalog.alterTable(newTable) + + // upserts the hoodie table's config file + val updatedProps = new Properties + properties.foreach(u => updatedProps.setProperty(u._1, u._2)) + AlterTableCommand.updateTableProperties(sparkSession, updatedProps, table) + logInfo("table properties change finished") } @@ -343,5 +358,21 @@ object AlterTableCommand extends Logging { ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue ) ++ DataSourceOptionsHelper.translateConfigurations(parameters) } + + def updateTableProperties(sparkSession: SparkSession, updatedProps: Properties, table: CatalogTable): Any = { + val path = AlterTableCommand.getTableLocation(table, sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(hadoopConf).build() + HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps) + } + + def deleteTableProperties(sparkSession: SparkSession, deletedProps: util.Set[String], table: CatalogTable): Any = { + val path = AlterTableCommand.getTableLocation(table, sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(hadoopConf).build() + HoodieTableConfig.delete(metaClient.getFs, new Path(metaClient.getMetaPath), deletedProps) + } }
