This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e84cfe3d41 [HUDI-6671] Support 'alter table add partition' sql (#9408)
6e84cfe3d41 is described below

commit 6e84cfe3d41f163ab653fea0309489f3e0c76215
Author: Wechar Yu <[email protected]>
AuthorDate: Mon Aug 28 09:52:56 2023 +0800

    [HUDI-6671] Support 'alter table add partition' sql (#9408)
---
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |  41 ++--
 .../AlterHoodieTableAddPartitionCommand.scala      |  94 +++++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   5 +-
 .../sql/hudi/TestAlterTableAddPartition.scala      | 228 +++++++++++++++++++++
 4 files changed, 353 insertions(+), 15 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index bad2784e1fd..d5f46936be5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -330,23 +330,36 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
     val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
     val enableHiveStylePartitioning = 
isHiveStyledPartitioning(allPartitionPaths, table)
     val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
-    val partitionsToDrop = normalizedSpecs.map { spec =>
-      hoodieCatalogTable.partitionFields.map { partitionColumn =>
-        val encodedPartitionValue = if (enableEncodeUrl) {
-          PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
-        } else {
-          spec(partitionColumn)
-        }
-        if (enableHiveStylePartitioning) {
-          partitionColumn + "=" + encodedPartitionValue
-        } else {
-          encodedPartitionValue
-        }
-      }.mkString("/")
-    }.mkString(",")
+    val partitionFields = hoodieCatalogTable.partitionFields
+    val partitionsToDrop = normalizedSpecs.map(
+      makePartitionPath(partitionFields, _, enableEncodeUrl, 
enableHiveStylePartitioning)
+    ).mkString(",")
     partitionsToDrop
   }
 
+  private def makePartitionPath(partitionFields: Seq[String],
+                                normalizedSpecs: Map[String, String],
+                                enableEncodeUrl: Boolean,
+                                enableHiveStylePartitioning: Boolean): String 
= {
+    partitionFields.map { partitionColumn =>
+      val encodedPartitionValue = if (enableEncodeUrl) {
+        
PartitionPathEncodeUtils.escapePathName(normalizedSpecs(partitionColumn))
+      } else {
+        normalizedSpecs(partitionColumn)
+      }
+      if (enableHiveStylePartitioning) 
s"$partitionColumn=$encodedPartitionValue" else encodedPartitionValue
+    }.mkString("/")
+  }
+
+  def makePartitionPath(hoodieCatalogTable: HoodieCatalogTable,
+                        normalizedSpecs: Map[String, String]): String = {
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val enableHiveStylePartitioning =  
java.lang.Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable)
+    val enableEncodeUrl = 
java.lang.Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning)
+
+    makePartitionPath(hoodieCatalogTable.partitionFields, normalizedSpecs, 
enableEncodeUrl, enableHiveStylePartitioning)
+  }
+
   private def validateInstant(queryInstant: String): Unit = {
     // Provided instant has to either
     //  - Match one of the bootstrapping instants
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddPartitionCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddPartitionCommand.scala
new file mode 100644
index 00000000000..e36f8d5f6f1
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddPartitionCommand.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodiePartitionMetadata
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
HoodieCatalogTable}
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{makePartitionPath, 
normalizePartitionSpec}
+
+import scala.util.control.NonFatal
+
+case class AlterHoodieTableAddPartitionCommand(
+   tableIdentifier: TableIdentifier,
+   partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
+   ifNotExists: Boolean)
+  extends HoodieLeafRunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    logInfo(s"start execute alter table add partition command for 
$tableIdentifier")
+
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
+
+    if (!hoodieCatalogTable.isPartitionedTable) {
+      throw new AnalysisException(s"$tableIdentifier is a non-partitioned 
table that is not allowed to add partition")
+    }
+
+    val catalog = sparkSession.sessionState.catalog
+    val table = hoodieCatalogTable.table
+    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
+
+    val normalizedSpecs: Seq[Map[String, String]] = partitionSpecsAndLocs.map 
{ case (spec, location) =>
+      if (location.isDefined) {
+        throw new AnalysisException(s"Hoodie table does not support specify 
partition location explicitly")
+      }
+      normalizePartitionSpec(
+        spec,
+        hoodieCatalogTable.partitionFields,
+        hoodieCatalogTable.tableName,
+        sparkSession.sessionState.conf.resolver)
+    }
+
+    val basePath = new Path(hoodieCatalogTable.tableLocation)
+    val fileSystem = hoodieCatalogTable.metaClient.getFs
+    val instantTime = HoodieActiveTimeline.createNewInstantTime
+    val format = hoodieCatalogTable.tableConfig.getPartitionMetafileFormat
+    val (partitionMetadata, parts) = normalizedSpecs.map { spec =>
+      val partitionPath = makePartitionPath(hoodieCatalogTable, spec)
+      val fullPartitionPath: Path = FSUtils.getPartitionPath(basePath, 
partitionPath)
+      val metadata = if 
(HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fullPartitionPath)) {
+        if (!ifNotExists) {
+          throw new AnalysisException(s"Partition metadata already exists for 
path: $fullPartitionPath")
+        }
+        None
+      } else Some(new HoodiePartitionMetadata(fileSystem, instantTime, 
basePath, fullPartitionPath, format))
+      (metadata, CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(fullPartitionPath.toUri))))
+    }.unzip
+    partitionMetadata.flatten.foreach(_.trySave(0))
+
+    // Sync new partitions in batch, enable ignoreIfExists to be silent for 
existing partitions.
+    val batchSize = 
sparkSession.sparkContext.conf.getInt("spark.sql.addPartitionInBatch.size", 100)
+    try {
+      parts.toIterator.grouped(batchSize).foreach { batch =>
+        catalog.createPartitions(tableIdentifier, batch, ignoreIfExists = true)
+      }
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Failed to add partitions in external catalog", e)
+    }
+    sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
+
+    Seq.empty[Row]
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 24820c1c032..eb7f00ef26b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -518,11 +518,14 @@ case class HoodiePostAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logic
         if sparkSession.sessionState.catalog.tableExists(tableName)
           && sparkAdapter.isHoodieTable(tableName, sparkSession) =>
         DropHoodieTableCommand(tableName, ifExists, false, purge)
+      // Rewrite the AlterTableAddPartitionCommand to 
AlterHoodieTableAddPartitionCommand
+      case AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, 
ifNotExists)
+        if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
+        AlterHoodieTableAddPartitionCommand(tableName, partitionSpecsAndLocs, 
ifNotExists)
       // Rewrite the AlterTableDropPartitionCommand to 
AlterHoodieTableDropPartitionCommand
       case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, 
retainData)
         if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
           AlterHoodieTableDropPartitionCommand(tableName, specs, ifExists, 
purge, retainData)
-      // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
       // Rewrite the AlterTableAddColumnsCommand to 
AlterHoodieTableAddColumnsCommand
       case AlterTableAddColumnsCommand(tableId, colsToAdd)
         if sparkAdapter.isHoodieTable(tableId, sparkSession) =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableAddPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableAddPartition.scala
new file mode 100644
index 00000000000..68c6c115448
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableAddPartition.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestAlterTableAddPartition extends HoodieSparkSqlTestBase {
+
+  test("Add partition for non-partitioned table") {
+    withTable(generateTableName){ tableName =>
+      // create table
+      spark.sql(
+        s"""
+           | create table $tableName (
+           |  id bigint,
+           |  name string,
+           |  ts string,
+           |  dt string
+           | )
+           | using hudi
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           |""".stripMargin)
+
+      checkExceptionContain(s"alter table $tableName add partition 
(dt='2023-08-01')")(
+        s"`$tableName` is a non-partitioned table that is not allowed to add 
partition")
+
+      // show partitions
+      checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
+    }
+  }
+
+  test("Add partition with location") {
+    withTable(generateTableName){ tableName =>
+      // create table
+      spark.sql(
+        s"""
+           | create table $tableName (
+           |  id bigint,
+           |  name string,
+           |  ts string,
+           |  dt string
+           | )
+           | using hudi
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by (dt)
+           |""".stripMargin)
+
+      checkExceptionContain(s"alter table $tableName add partition 
(dt='2023-08-01') location '/tmp/path'")(
+        "Hoodie table does not support specify partition location explicitly")
+
+      // show partitions
+      checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
+    }
+  }
+
+  test("Add partition if not exists") {
+    withTable(generateTableName){ tableName =>
+      // create table
+      spark.sql(
+        s"""
+           | create table $tableName (
+           |  id bigint,
+           |  name string,
+           |  ts string,
+           |  dt string
+           | )
+           | using hudi
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by (dt)
+           |""".stripMargin)
+
+      spark.sql(s"alter table $tableName add partition (dt='2023-08-01')")
+      // show partitions
+      checkAnswer(s"show partitions $tableName")(Seq("dt=2023-08-01"))
+
+      // no exception
+      spark.sql(s"alter table $tableName add if not exists partition 
(dt='2023-08-01')")
+
+      checkExceptionContain(s"alter table $tableName add partition 
(dt='2023-08-01')")(
+        "Partition metadata already exists for path")
+    }
+  }
+
+  Seq(false, true).foreach { hiveStyle =>
+    test(s"Add partition for single-partition table, isHiveStylePartitioning: 
$hiveStyle") {
+      withTable(generateTableName){ tableName =>
+        // create table
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id bigint,
+             |  name string,
+             |  ts string,
+             |  dt string
+             | )
+             | using hudi
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.datasource.write.hive_style_partitioning = '$hiveStyle'
+             | )
+             | partitioned by (dt)
+             |""".stripMargin)
+
+        spark.sql(s"alter table $tableName add partition (dt='2023-08-01')")
+
+        // show partitions
+        checkAnswer(s"show partitions $tableName")(
+          if (hiveStyle) Seq("dt=2023-08-01") else Seq("2023-08-01")
+        )
+      }
+    }
+
+    test(s"Add partition for multi-level partitioned table, 
isHiveStylePartitioning: $hiveStyle") {
+      withTable(generateTableName){ tableName =>
+        // create table
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id bigint,
+             |  name string,
+             |  ts string,
+             |  year string,
+             |  month string,
+             |  day string
+             | )
+             | using hudi
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.datasource.write.hive_style_partitioning = '$hiveStyle'
+             | )
+             | partitioned by (year, month, day)
+             |""".stripMargin)
+
+        spark.sql(s"alter table $tableName add partition (year='2023', 
month='08', day='01')")
+
+        // show partitions
+        checkAnswer(s"show partitions $tableName")(
+          if (hiveStyle) Seq("year=2023/month=08/day=01") else 
Seq("2023/08/01")
+        )
+      }
+    }
+  }
+
+  Seq(false, true).foreach { urlEncode =>
+    test(s"Add partition for single-partition table, urlEncode: $urlEncode") {
+      withTable(generateTableName){ tableName =>
+        // create table
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id bigint,
+             |  name string,
+             |  ts string,
+             |  p_a string
+             | )
+             | using hudi
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.datasource.write.partitionpath.urlencode = '$urlEncode'
+             | )
+             | partitioned by (p_a)
+             |""".stripMargin)
+
+        spark.sql(s"alter table $tableName add partition (p_a='url%a')")
+
+        // show partitions
+        checkAnswer(s"show partitions $tableName")(
+          if (urlEncode) Seq("p_a=url%25a") else Seq("p_a=url%a")
+        )
+      }
+    }
+
+    test(s"Add partition for multi-level partitioned table, urlEncode: 
$urlEncode") {
+      withTable(generateTableName){ tableName =>
+        // create table
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id bigint,
+             |  name string,
+             |  ts string,
+             |  p_a string,
+             |  p_b string
+             | )
+             | using hudi
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.datasource.write.partitionpath.urlencode = '$urlEncode'
+             | )
+             | partitioned by (p_a, p_b)
+             |""".stripMargin)
+
+        spark.sql(s"alter table $tableName add partition (p_a='url%a', 
p_b='key=val')")
+
+        // show partitions
+        checkAnswer(s"show partitions $tableName")(
+          if (urlEncode) Seq("p_a=url%25a/p_b=key%3Dval") else 
Seq("p_a=url%a/p_b=key=val")
+        )
+      }
+    }
+  }
+}

Reply via email to