This is an automated email from the ASF dual-hosted git repository.
zhiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 59ff842 [HUDI-2223] Fix Alter Partitioned Table Failed (#3350)
59ff842 is described below
commit 59ff8423f96055fedc7a8ebf0db08666ae725726
Author: pengzhiwei <[email protected]>
AuthorDate: Tue Jul 27 20:01:04 2021 +0800
[HUDI-2223] Fix Alter Partitioned Table Failed (#3350)
---
.../AlterHoodieTableAddColumnsCommand.scala | 15 ++++++++---
.../AlterHoodieTableChangeColumnCommand.scala | 14 +++++++---
.../org/apache/spark/sql/hudi/TestAlterTable.scala | 30 ++++++++++++++++++++++
3 files changed, 53 insertions(+), 6 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index b513034..c5a6f84 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hudi.command
import java.nio.charset.StandardCharsets
-
import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -30,10 +29,11 @@ import org.apache.hudi.table.HoodieSparkTable
import scala.collection.JavaConverters._
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils,
HoodieWriterUtils}
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
+import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
@@ -50,7 +50,15 @@ case class AlterHoodieTableAddColumnsCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
if (colsToAdd.nonEmpty) {
+ val resolver = sparkSession.sessionState.conf.resolver
val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
+ val existsColumns =
+ colsToAdd.map(_.name).filter(col => table.schema.fieldNames.exists(f
=> resolver(f, col)))
+
+ if (existsColumns.nonEmpty) {
+ throw new AnalysisException(s"Columns:
[${existsColumns.mkString(",")}] already exists in the table," +
+ s" table columns is:
[${HoodieSqlUtils.removeMetaFields(table.schema).fieldNames.mkString(",")}]")
+ }
// Get the new schema
val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
@@ -60,7 +68,8 @@ case class AlterHoodieTableAddColumnsCommand(
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table,
sparkSession)
// Refresh the new schema to meta
- refreshSchemaInMeta(sparkSession, table, newSqlSchema)
+ val newDataSchema = StructType(table.dataSchema.fields ++ colsToAdd)
+ refreshSchemaInMeta(sparkSession, table, newDataSchema)
}
Seq.empty[Row]
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
index 78334fd..d38b98c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -49,13 +49,21 @@ case class AlterHoodieTableChangeColumnCommand(
}
// Get the new schema
val newSqlSchema = StructType(
- table.dataSchema.fields.map { field =>
+ table.schema.fields.map { field =>
if (resolver(field.name, columnName)) {
newColumn
} else {
field
}
})
+ val newDataSchema = StructType(
+ table.dataSchema.fields.map { field =>
+ if (resolver(field.name, columnName)) {
+ newColumn
+ } else {
+ field
+ }
+ })
val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
val newSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName,
nameSpace)
@@ -76,8 +84,8 @@ case class AlterHoodieTableChangeColumnCommand(
log.warn(s"Exception when attempting to uncache table
${tableName.quotedString}", e)
}
sparkSession.catalog.refreshTable(tableName.unquotedString)
- // Change the schema in the meta
- catalog.alterTableDataSchema(tableName, newSqlSchema)
+ // Change the schema in the meta using new data schema.
+ catalog.alterTableDataSchema(tableName, newDataSchema)
Seq.empty[Row]
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
index ee73823..e1bc4a1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
@@ -115,6 +115,36 @@ class TestAlterTable extends TestHoodieSqlBase {
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(2, "a2", 10.0, 1000, null)
)
+
+ val partitionedTable = generateTableName
+ spark.sql(
+ s"""
+ |create table $partitionedTable (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$partitionedTable'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by (dt)
+ """.stripMargin)
+ spark.sql(s"insert into $partitionedTable values(1, 'a1', 10, 1000,
'2021-07-25')")
+ spark.sql(s"alter table $partitionedTable add columns(ext0 double)")
+ checkAnswer(s"select id, name, price, ts, dt, ext0 from
$partitionedTable")(
+ Seq(1, "a1", 10.0, 1000, "2021-07-25", null)
+ )
+
+ spark.sql(s"insert into $partitionedTable values(2, 'a2', 10, 1000, 1,
'2021-07-25')");
+ checkAnswer(s"select id, name, price, ts, dt, ext0 from
$partitionedTable order by id")(
+ Seq(1, "a1", 10.0, 1000, "2021-07-25", null),
+ Seq(2, "a2", 10.0, 1000, "2021-07-25", 1.0)
+ )
}
}
}