This is an automated email from the ASF dual-hosted git repository.

zhiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ff842  [HUDI-2223] Fix Alter Partitioned Table Failed (#3350)
59ff842 is described below

commit 59ff8423f96055fedc7a8ebf0db08666ae725726
Author: pengzhiwei <[email protected]>
AuthorDate: Tue Jul 27 20:01:04 2021 +0800

    [HUDI-2223] Fix Alter Partitioned Table Failed (#3350)
---
 .../AlterHoodieTableAddColumnsCommand.scala        | 15 ++++++++---
 .../AlterHoodieTableChangeColumnCommand.scala      | 14 +++++++---
 .../org/apache/spark/sql/hudi/TestAlterTable.scala | 30 ++++++++++++++++++++++
 3 files changed, 53 insertions(+), 6 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index b513034..c5a6f84 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.hudi.command
 
 import java.nio.charset.StandardCharsets
-
 import org.apache.avro.Schema
 import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -30,10 +29,11 @@ import org.apache.hudi.table.HoodieSparkTable
 import scala.collection.JavaConverters._
 import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, 
HoodieWriterUtils}
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
+import org.apache.spark.sql.hudi.HoodieSqlUtils
 import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.SchemaUtils
@@ -50,7 +50,15 @@ case class AlterHoodieTableAddColumnsCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     if (colsToAdd.nonEmpty) {
+      val resolver = sparkSession.sessionState.conf.resolver
       val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
+      val existsColumns =
+        colsToAdd.map(_.name).filter(col => table.schema.fieldNames.exists(f 
=> resolver(f, col)))
+
+      if (existsColumns.nonEmpty) {
+        throw new AnalysisException(s"Columns: 
[${existsColumns.mkString(",")}] already exists in the table," +
+          s" table columns is: 
[${HoodieSqlUtils.removeMetaFields(table.schema).fieldNames.mkString(",")}]")
+      }
       // Get the new schema
       val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
       val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
@@ -60,7 +68,8 @@ case class AlterHoodieTableAddColumnsCommand(
       AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, 
sparkSession)
 
       // Refresh the new schema to meta
-      refreshSchemaInMeta(sparkSession, table, newSqlSchema)
+      val newDataSchema = StructType(table.dataSchema.fields ++ colsToAdd)
+      refreshSchemaInMeta(sparkSession, table, newDataSchema)
     }
     Seq.empty[Row]
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
index 78334fd..d38b98c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -49,13 +49,21 @@ case class AlterHoodieTableChangeColumnCommand(
     }
     // Get the new schema
     val newSqlSchema = StructType(
-      table.dataSchema.fields.map { field =>
+      table.schema.fields.map { field =>
       if (resolver(field.name, columnName)) {
         newColumn
       } else {
         field
       }
     })
+    val newDataSchema = StructType(
+      table.dataSchema.fields.map { field =>
+        if (resolver(field.name, columnName)) {
+          newColumn
+        } else {
+          field
+        }
+      })
     val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
     val newSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, 
nameSpace)
 
@@ -76,8 +84,8 @@ case class AlterHoodieTableChangeColumnCommand(
         log.warn(s"Exception when attempting to uncache table 
${tableName.quotedString}", e)
     }
     sparkSession.catalog.refreshTable(tableName.unquotedString)
-    // Change the schema in the meta
-    catalog.alterTableDataSchema(tableName, newSqlSchema)
+    // Change the schema in the meta using new data schema.
+    catalog.alterTableDataSchema(tableName, newDataSchema)
 
     Seq.empty[Row]
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
index ee73823..e1bc4a1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
@@ -115,6 +115,36 @@ class TestAlterTable extends TestHoodieSqlBase {
         checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
           Seq(2, "a2", 10.0, 1000, null)
         )
+
+        val partitionedTable = generateTableName
+        spark.sql(
+          s"""
+             |create table $partitionedTable (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  dt string
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$partitionedTable'
+             | options (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by (dt)
+       """.stripMargin)
+        spark.sql(s"insert into $partitionedTable values(1, 'a1', 10, 1000, 
'2021-07-25')")
+        spark.sql(s"alter table $partitionedTable add columns(ext0 double)")
+        checkAnswer(s"select id, name, price, ts, dt, ext0 from 
$partitionedTable")(
+          Seq(1, "a1", 10.0, 1000, "2021-07-25", null)
+        )
+
+        spark.sql(s"insert into $partitionedTable values(2, 'a2', 10, 1000, 1, 
'2021-07-25')");
+        checkAnswer(s"select id, name, price, ts, dt, ext0 from 
$partitionedTable order by id")(
+          Seq(1, "a1", 10.0, 1000, "2021-07-25", null),
+          Seq(2, "a2", 10.0, 1000, "2021-07-25", 1.0)
+        )
       }
     }
   }

Reply via email to