This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6e84cfe3d41 [HUDI-6671] Support 'alter table add partition' sql (#9408)
6e84cfe3d41 is described below
commit 6e84cfe3d41f163ab653fea0309489f3e0c76215
Author: Wechar Yu <[email protected]>
AuthorDate: Mon Aug 28 09:52:56 2023 +0800
[HUDI-6671] Support 'alter table add partition' sql (#9408)
---
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 41 ++--
.../AlterHoodieTableAddPartitionCommand.scala | 94 +++++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 5 +-
.../sql/hudi/TestAlterTableAddPartition.scala | 228 +++++++++++++++++++++
4 files changed, 353 insertions(+), 15 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index bad2784e1fd..d5f46936be5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -330,23 +330,36 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
val enableHiveStylePartitioning =
isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
- val partitionsToDrop = normalizedSpecs.map { spec =>
- hoodieCatalogTable.partitionFields.map { partitionColumn =>
- val encodedPartitionValue = if (enableEncodeUrl) {
- PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
- } else {
- spec(partitionColumn)
- }
- if (enableHiveStylePartitioning) {
- partitionColumn + "=" + encodedPartitionValue
- } else {
- encodedPartitionValue
- }
- }.mkString("/")
- }.mkString(",")
+ val partitionFields = hoodieCatalogTable.partitionFields
+ val partitionsToDrop = normalizedSpecs.map(
+ makePartitionPath(partitionFields, _, enableEncodeUrl,
enableHiveStylePartitioning)
+ ).mkString(",")
partitionsToDrop
}
+ private def makePartitionPath(partitionFields: Seq[String],
+ normalizedSpecs: Map[String, String],
+ enableEncodeUrl: Boolean,
+ enableHiveStylePartitioning: Boolean): String
= {
+ partitionFields.map { partitionColumn =>
+ val encodedPartitionValue = if (enableEncodeUrl) {
+
PartitionPathEncodeUtils.escapePathName(normalizedSpecs(partitionColumn))
+ } else {
+ normalizedSpecs(partitionColumn)
+ }
+ if (enableHiveStylePartitioning)
s"$partitionColumn=$encodedPartitionValue" else encodedPartitionValue
+ }.mkString("/")
+ }
+
+ def makePartitionPath(hoodieCatalogTable: HoodieCatalogTable,
+ normalizedSpecs: Map[String, String]): String = {
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val enableHiveStylePartitioning =
java.lang.Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable)
+ val enableEncodeUrl =
java.lang.Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning)
+
+ makePartitionPath(hoodieCatalogTable.partitionFields, normalizedSpecs,
enableEncodeUrl, enableHiveStylePartitioning)
+ }
+
private def validateInstant(queryInstant: String): Unit = {
// Provided instant has to either
// - Match one of the bootstrapping instants
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddPartitionCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddPartitionCommand.scala
new file mode 100644
index 00000000000..e36f8d5f6f1
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddPartitionCommand.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodiePartitionMetadata
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition,
HoodieCatalogTable}
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{makePartitionPath,
normalizePartitionSpec}
+
+import scala.util.control.NonFatal
+
+case class AlterHoodieTableAddPartitionCommand(
+ tableIdentifier: TableIdentifier,
+ partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
+ ifNotExists: Boolean)
+ extends HoodieLeafRunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ logInfo(s"start execute alter table add partition command for
$tableIdentifier")
+
+ val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
+
+ if (!hoodieCatalogTable.isPartitionedTable) {
+ throw new AnalysisException(s"$tableIdentifier is a non-partitioned
table that is not allowed to add partition")
+ }
+
+ val catalog = sparkSession.sessionState.catalog
+ val table = hoodieCatalogTable.table
+ DDLUtils.verifyAlterTableType(catalog, table, isView = false)
+
+ val normalizedSpecs: Seq[Map[String, String]] = partitionSpecsAndLocs.map
{ case (spec, location) =>
+ if (location.isDefined) {
+ throw new AnalysisException(s"Hoodie table does not support specify
partition location explicitly")
+ }
+ normalizePartitionSpec(
+ spec,
+ hoodieCatalogTable.partitionFields,
+ hoodieCatalogTable.tableName,
+ sparkSession.sessionState.conf.resolver)
+ }
+
+ val basePath = new Path(hoodieCatalogTable.tableLocation)
+ val fileSystem = hoodieCatalogTable.metaClient.getFs
+ val instantTime = HoodieActiveTimeline.createNewInstantTime
+ val format = hoodieCatalogTable.tableConfig.getPartitionMetafileFormat
+ val (partitionMetadata, parts) = normalizedSpecs.map { spec =>
+ val partitionPath = makePartitionPath(hoodieCatalogTable, spec)
+ val fullPartitionPath: Path = FSUtils.getPartitionPath(basePath,
partitionPath)
+ val metadata = if
(HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fullPartitionPath)) {
+ if (!ifNotExists) {
+ throw new AnalysisException(s"Partition metadata already exists for
path: $fullPartitionPath")
+ }
+ None
+ } else Some(new HoodiePartitionMetadata(fileSystem, instantTime,
basePath, fullPartitionPath, format))
+ (metadata, CatalogTablePartition(spec, table.storage.copy(locationUri =
Some(fullPartitionPath.toUri))))
+ }.unzip
+ partitionMetadata.flatten.foreach(_.trySave(0))
+
+ // Sync new partitions in batch, enable ignoreIfExists to be silent for
existing partitions.
+ val batchSize =
sparkSession.sparkContext.conf.getInt("spark.sql.addPartitionInBatch.size", 100)
+ try {
+ parts.toIterator.grouped(batchSize).foreach { batch =>
+ catalog.createPartitions(tableIdentifier, batch, ignoreIfExists = true)
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Failed to add partitions in external catalog", e)
+ }
+ sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
+
+ Seq.empty[Row]
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 24820c1c032..eb7f00ef26b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -518,11 +518,14 @@ case class HoodiePostAnalysisRule(sparkSession:
SparkSession) extends Rule[Logic
if sparkSession.sessionState.catalog.tableExists(tableName)
&& sparkAdapter.isHoodieTable(tableName, sparkSession) =>
DropHoodieTableCommand(tableName, ifExists, false, purge)
+ // Rewrite the AlterTableAddPartitionCommand to
AlterHoodieTableAddPartitionCommand
+ case AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs,
ifNotExists)
+ if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
+ AlterHoodieTableAddPartitionCommand(tableName, partitionSpecsAndLocs,
ifNotExists)
// Rewrite the AlterTableDropPartitionCommand to
AlterHoodieTableDropPartitionCommand
case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge,
retainData)
if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableDropPartitionCommand(tableName, specs, ifExists,
purge, retainData)
- // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
// Rewrite the AlterTableAddColumnsCommand to
AlterHoodieTableAddColumnsCommand
case AlterTableAddColumnsCommand(tableId, colsToAdd)
if sparkAdapter.isHoodieTable(tableId, sparkSession) =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableAddPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableAddPartition.scala
new file mode 100644
index 00000000000..68c6c115448
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableAddPartition.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestAlterTableAddPartition extends HoodieSparkSqlTestBase {
+
+ test("Add partition for non-partitioned table") {
+ withTable(generateTableName){ tableName =>
+ // create table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | ts string,
+ | dt string
+ | )
+ | using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ |""".stripMargin)
+
+ checkExceptionContain(s"alter table $tableName add partition
(dt='2023-08-01')")(
+ s"`$tableName` is a non-partitioned table that is not allowed to add
partition")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
+ }
+ }
+
+ test("Add partition with location") {
+ withTable(generateTableName){ tableName =>
+ // create table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | ts string,
+ | dt string
+ | )
+ | using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by (dt)
+ |""".stripMargin)
+
+ checkExceptionContain(s"alter table $tableName add partition
(dt='2023-08-01') location '/tmp/path'")(
+ "Hoodie table does not support specify partition location explicitly")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
+ }
+ }
+
+ test("Add partition if not exists") {
+ withTable(generateTableName){ tableName =>
+ // create table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | ts string,
+ | dt string
+ | )
+ | using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by (dt)
+ |""".stripMargin)
+
+ spark.sql(s"alter table $tableName add partition (dt='2023-08-01')")
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(Seq("dt=2023-08-01"))
+
+ // no exception
+ spark.sql(s"alter table $tableName add if not exists partition
(dt='2023-08-01')")
+
+ checkExceptionContain(s"alter table $tableName add partition
(dt='2023-08-01')")(
+ "Partition metadata already exists for path")
+ }
+ }
+
+ Seq(false, true).foreach { hiveStyle =>
+ test(s"Add partition for single-partition table, isHiveStylePartitioning:
$hiveStyle") {
+ withTable(generateTableName){ tableName =>
+ // create table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | ts string,
+ | dt string
+ | )
+ | using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.hive_style_partitioning = '$hiveStyle'
+ | )
+ | partitioned by (dt)
+ |""".stripMargin)
+
+ spark.sql(s"alter table $tableName add partition (dt='2023-08-01')")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(
+ if (hiveStyle) Seq("dt=2023-08-01") else Seq("2023-08-01")
+ )
+ }
+ }
+
+ test(s"Add partition for multi-level partitioned table,
isHiveStylePartitioning: $hiveStyle") {
+ withTable(generateTableName){ tableName =>
+ // create table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | ts string,
+ | year string,
+ | month string,
+ | day string
+ | )
+ | using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.hive_style_partitioning = '$hiveStyle'
+ | )
+ | partitioned by (year, month, day)
+ |""".stripMargin)
+
+ spark.sql(s"alter table $tableName add partition (year='2023',
month='08', day='01')")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(
+ if (hiveStyle) Seq("year=2023/month=08/day=01") else
Seq("2023/08/01")
+ )
+ }
+ }
+ }
+
+ Seq(false, true).foreach { urlEncode =>
+ test(s"Add partition for single-partition table, urlEncode: $urlEncode") {
+ withTable(generateTableName){ tableName =>
+ // create table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | ts string,
+ | p_a string
+ | )
+ | using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.partitionpath.urlencode = '$urlEncode'
+ | )
+ | partitioned by (p_a)
+ |""".stripMargin)
+
+ spark.sql(s"alter table $tableName add partition (p_a='url%a')")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(
+ if (urlEncode) Seq("p_a=url%25a") else Seq("p_a=url%a")
+ )
+ }
+ }
+
+ test(s"Add partition for multi-level partitioned table, urlEncode:
$urlEncode") {
+ withTable(generateTableName){ tableName =>
+ // create table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | ts string,
+ | p_a string,
+ | p_b string
+ | )
+ | using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.partitionpath.urlencode = '$urlEncode'
+ | )
+ | partitioned by (p_a, p_b)
+ |""".stripMargin)
+
+ spark.sql(s"alter table $tableName add partition (p_a='url%a',
p_b='key=val')")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(
+ if (urlEncode) Seq("p_a=url%25a/p_b=key%3Dval") else
Seq("p_a=url%a/p_b=key=val")
+ )
+ }
+ }
+ }
+}