This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a56c00e767f5bd64beb533263ac4b9dee3c305c1
Author: KnightChess <[email protected]>
AuthorDate: Tue Dec 13 23:02:09 2022 +0800

    [HUDI-4113] Fix cannot parse <null> schema when use spark delete sql (#5610)
    
    - Alter table drop partition will not add schema to instant.
      If using delete sql, will get latest instant to get schema,
      which is "". This PR fixes the parsing of null or empty schema.
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  5 +-
 .../sql/hudi/TestAlterTableDropPartition.scala     | 62 +++++++++++++++++++++-
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 011235e4369..56ad47c9453 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -59,6 +59,7 @@ import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieArchivalConfig;
@@ -1513,7 +1514,8 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
       if (lastInstant.isPresent()) {
         HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
             activeTimeline.getInstantDetails(lastInstant.get()).get(), 
HoodieCommitMetadata.class);
-        if (commitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {
+        String extraSchema = commitMetadata.getExtraMetadata().get(SCHEMA_KEY);
+        if (!StringUtils.isNullOrEmpty(extraSchema)) {
           config.setSchema(commitMetadata.getExtraMetadata().get(SCHEMA_KEY));
         } else {
           throw new HoodieIOException("Latest commit does not have any schema 
in commit metadata");
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index d8f9ab8435b..b358ae1d2d0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -240,8 +240,11 @@ object HoodieSparkSqlWriter {
             }
 
             // Create a HoodieWriteClient & issue the delete.
+            val tableMetaClient = HoodieTableMetaClient.builder
+              
.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
+            val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema.toString
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-              null, path, tblName,
+              schemaStr, path, tblName,
               mapAsJavaMap(parameters - 
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
               .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             // Issue delete partitions
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index e063f67d8c0..15b14ec77f5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -19,10 +19,13 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.common.model.HoodieCommitMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
 import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions
 
 class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
 
@@ -336,4 +339,61 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("check instance schema") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        // create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  dt string
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName'
+             | tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by (dt)
+       """.stripMargin)
+        // insert data to table
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, '01'), " +
+          s"(2, 'a2', 10, 1000, '02'), (3, 'a3', 10, 1000, '03')")
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "01"),
+          Seq(2, "a2", 10.0, 1000, "02"),
+          Seq(3, "a3", 10.0, 1000, "03")
+        )
+
+        // drop partition
+        spark.sql(s"alter table $tableName drop partition (dt = '01')")
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(2, "a2", 10.0, 1000, "02"),
+          Seq(3, "a3", 10.0, 1000, "03")
+        )
+
+        // check schema
+        val hadoopConf = spark.sessionState.newHadoopConf()
+        val metaClient = 
HoodieTableMetaClient.builder().setBasePath(s"${tmp.getCanonicalPath}/$tableName")
+          .setConf(hadoopConf).build()
+        val lastInstant = metaClient.getActiveTimeline.lastInstant()
+        val commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(
+          lastInstant.get()).get(), classOf[HoodieCommitMetadata])
+        val schemaStr = 
commitMetadata.getExtraMetadata.get(HoodieCommitMetadata.SCHEMA_KEY)
+        Assertions.assertFalse(StringUtils.isNullOrEmpty(schemaStr))
+
+        // delete
+        spark.sql(s"delete from $tableName where dt = '02'")
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(3, "a3", 10, 1000, "03")
+        )
+      }
+    }
+  }
 }

Reply via email to