This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a56c00e767f5bd64beb533263ac4b9dee3c305c1 Author: KnightChess <[email protected]> AuthorDate: Tue Dec 13 23:02:09 2022 +0800 [HUDI-4113] Fix cannot parse <null> schema when use spark delete sql (#5610) - Alter table drop partition will not add schema to instant. If using delete sql, will get latest instant to get schema, which is "". This PR fixes the parsing of null or empty schema. Co-authored-by: Sagar Sumit <[email protected]> --- .../apache/hudi/client/BaseHoodieWriteClient.java | 4 +- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +- .../sql/hudi/TestAlterTableDropPartition.scala | 62 +++++++++++++++++++++- 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 011235e4369..56ad47c9453 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -59,6 +59,7 @@ import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieArchivalConfig; @@ -1513,7 +1514,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, if (lastInstant.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); - if (commitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY)) { + String extraSchema = commitMetadata.getExtraMetadata().get(SCHEMA_KEY); + if (!StringUtils.isNullOrEmpty(extraSchema)) { config.setSchema(commitMetadata.getExtraMetadata().get(SCHEMA_KEY)); } else { throw new HoodieIOException("Latest commit does not have any schema in commit metadata"); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d8f9ab8435b..b358ae1d2d0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -240,8 +240,11 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. + val tableMetaClient = HoodieTableMetaClient.builder + .setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + val schemaStr = new TableSchemaResolver(tableMetaClient).getTableAvroSchema.toString val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - null, path, tblName, + schemaStr, path, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] // Issue delete partitions diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index e063f67d8c0..15b14ec77f5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils -import org.apache.hudi.common.util.PartitionPathEncodeUtils +import org.apache.hudi.common.model.HoodieCommitMetadata +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} import org.apache.spark.sql.SaveMode +import org.junit.jupiter.api.Assertions class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { @@ -336,4 +339,61 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { } } } + + test("check instance schema") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (dt) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, '01'), " + + s"(2, 'a2', 10, 1000, '02'), (3, 'a3', 10, 1000, '03')") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "01"), + Seq(2, "a2", 10.0, 1000, "02"), + Seq(3, "a3", 10.0, 1000, "03") + ) + + // drop partition + spark.sql(s"alter table $tableName drop partition (dt = '01')") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(2, "a2", 10.0, 1000, "02"), + Seq(3, "a3", 10.0, 1000, "03") + ) + + // check schema + val hadoopConf = spark.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(s"${tmp.getCanonicalPath}/$tableName") + .setConf(hadoopConf).build() + val lastInstant = metaClient.getActiveTimeline.lastInstant() + val commitMetadata = HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails( + lastInstant.get()).get(), classOf[HoodieCommitMetadata]) + val schemaStr = commitMetadata.getExtraMetadata.get(HoodieCommitMetadata.SCHEMA_KEY) + Assertions.assertFalse(StringUtils.isNullOrEmpty(schemaStr)) + + // delete + spark.sql(s"delete from $tableName where dt = '02'") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(3, "a3", 10, 1000, "03") + ) + } + } + } }
