This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e63a2bc189dfddd80157519a75b61d3d0519b9c2 Author: 苏承祥 <[email protected]> AuthorDate: Mon Dec 12 16:50:02 2022 +0800 [HUDI-5372] Fix NPE caused by alter table add column. (#7236) --- .../AlterHoodieTableAddColumnsCommand.scala | 2 + .../org/apache/spark/sql/hudi/TestAlterTable.scala | 84 ++++++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index 1d65670f6d3..9852f296d33 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -112,7 +112,9 @@ object AlterHoodieTableAddColumnsCommand { val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, hoodieCatalogTable.tableType) val instantTime = HoodieActiveTimeline.createNewInstantTime + client.startCommitWithTime(instantTime, commitActionType) + client.preWrite(instantTime, WriteOperationType.ALTER_SCHEMA, hoodieCatalogTable.metaClient) val hoodieTable = HoodieSparkTable.create(client.getConfig, client.getEngineContext) val timeLine = hoodieTable.getActiveTimeline diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index f300df34aaf..bf0ce86ed00 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -241,4 +241,88 @@ class TestAlterTable extends HoodieSparkSqlTestBase { } } } + test("Test Alter Table With OCC") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.write.concurrency.mode='optimistic_concurrency_control', + | hoodie.cleaner.policy.failed.writes='LAZY', + | hoodie.write.lock.provider='org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider' + | ) + """.stripMargin) + + // change column comment + spark.sql(s"alter table $tableName change column id id int comment 'primary id'") + var catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult("primary id") ( + catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get + ) + spark.sql(s"alter table $tableName change column name name string comment 'name column'") + spark.sessionState.catalog.refreshTable(new TableIdentifier(tableName)) + catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult("primary id") ( + catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get + ) + assertResult("name column") ( + catalogTable.schema(catalogTable.schema.fieldIndex("name")).getComment().get + ) + + // alter table name. + val newTableName = s"${tableName}_1" + spark.sql(s"alter table $tableName rename to $newTableName") + assertResult(false)( + spark.sessionState.catalog.tableExists(new TableIdentifier(tableName)) + ) + assertResult(true) ( + spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName)) + ) + + val hadoopConf = spark.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath) + .setConf(hadoopConf).build() + assertResult(newTableName) (metaClient.getTableConfig.getTableName) + + // insert some data + spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)") + + // add column + spark.sql(s"alter table $newTableName add columns(ext0 string)") + catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName)) + assertResult(Seq("id", "name", "price", "ts", "ext0")) { + HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) + } + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(1, "a1", 10.0, 1000, null) + ) + + // change column's data type + checkExceptionContain(s"alter table $newTableName change column id id bigint") ( + "ALTER TABLE CHANGE COLUMN is not supported for changing column 'id'" + + " with type 'IntegerType' to 'id' with type 'LongType'" + ) + + // Insert data to the new table. + spark.sql(s"insert into $newTableName values(2, 'a2', 12, 1000, 'e0')") + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 12.0, 1000, "e0") + ) + } + } + } }
