This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new c7d68b3abcd [HUDI-7914] Fix replacecommit incorrect fields (#11522)
c7d68b3abcd is described below

commit c7d68b3abcd89e798801d44f11b60d1e0d875b84
Author: Vitali Makarevich <[email protected]>
AuthorDate: Mon Jul 1 03:05:29 2024 +0200

    [HUDI-7914] Fix replacecommit incorrect fields (#11522)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  6 ++----
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  4 ++++
 .../sql/hudi/ddl/TestAlterTableDropPartition.scala | 23 +++++++++++++++++++++-
 3 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1a8031b9fe2..02ca533f45b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -49,9 +49,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchemaRequirements
-import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, 
SerDeHelper}
-import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{BaseKeyGenerator, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.metrics.Metrics
@@ -402,7 +400,7 @@ class HoodieSparkSqlWriterInternal {
             }
 
             // Issue the delete.
-            val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema.toString
+            val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
               schemaStr, path, tblName,
                 (parameters - 
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b48e4f4cb1a..33f84236faf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -50,6 +50,10 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     dir
   }
 
+  protected def getTableStoragePath(tableName: String): String = {
+    new File(sparkWareHouse, tableName).getCanonicalPath
+  }
+
   // NOTE: We need to set "spark.testing" property to make sure Spark can 
appropriately
   //       recognize environment as testing
   System.setProperty("spark.testing", "true")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
index bdaf51e9bd2..7ef473e16ca 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.ddl
 
+import org.apache.avro.Schema
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.avro.model.{HoodieCleanMetadata, 
HoodieCleanPartitionMetadata}
 import org.apache.hudi.common.model.{HoodieCleaningPolicy, 
HoodieCommitMetadata}
@@ -26,15 +27,31 @@ import org.apache.hudi.config.{HoodieCleanConfig, 
HoodieWriteConfig}
 import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
-
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCleanMetadata
 import org.junit.jupiter.api.Assertions
 import org.junit.jupiter.api.Assertions.assertTrue
 
+import scala.collection.JavaConverters._
+
 class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
 
+  private def ensureLastCommitIncludesProperSchema(path: String, 
expectedSchema: Seq[String]): Unit = {
+    val metaClient = createMetaClient(spark, path)
+    // A bit weird way to extract schema, but there is no way to get it 
exactly as is, since once `includeMetadataFields`
+    // is used - it will use custom logic to forcefully add/remove fields.
+    // And available public methods does not allow to specify exact instant to 
get schema from, only latest after some filtering
+    // which may lead to false positives in test scenarios.
+    val lastInstant = 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant().get()
+    val commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastInstant).get(),
 classOf[HoodieCommitMetadata])
+    val schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)
+    val schema = new Schema.Parser().parse(schemaStr)
+    val fields = schema.getFields.asScala.map(_.name())
+
+    assert(expectedSchema == fields, s"Commit metadata should include no meta 
fields, received $fields")
+  }
+
   test("Drop non-partitioned table") {
     val tableName = generateTableName
     // create table
@@ -624,6 +641,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
+        val schemaFields = Seq("id", "name", "price", "ts", 
"partition_date_col")
         spark.sql(
           s"""
              |create table $tableName (
@@ -648,6 +666,9 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
           Seq("partition_date_col=2023-09-01")
         )
         spark.sql(s"alter table $tableName drop 
partition(partition_date_col='2023-08-*')")
+        // Since incremental query utilizes a bit different schema read 
scenario, if `replacecommit` is written with
+        // META fields, read fails because of duplicated META columns.
+        
ensureLastCommitIncludesProperSchema(s"${tmp.getCanonicalPath}/$tableName", 
schemaFields)
         // show partitions will still return all partitions for tests, use 
select distinct as a stop-gap
         checkAnswer(s"select distinct partition_date_col from $tableName")(
           Seq("2023-09-01")

Reply via email to