This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new c7d68b3abcd [HUDI-7914] Fix replacecommit incorrect fields (#11522)
c7d68b3abcd is described below
commit c7d68b3abcd89e798801d44f11b60d1e0d875b84
Author: Vitali Makarevich <[email protected]>
AuthorDate: Mon Jul 1 03:05:29 2024 +0200
[HUDI-7914] Fix replacecommit incorrect fields (#11522)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 6 ++----
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 4 ++++
.../sql/hudi/ddl/TestAlterTableDropPartition.scala | 23 +++++++++++++++++++++-
3 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1a8031b9fe2..02ca533f45b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -49,9 +49,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchemaRequirements
-import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils,
SerDeHelper}
-import org.apache.hudi.keygen.constant.KeyGeneratorType
+import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
@@ -402,7 +400,7 @@ class HoodieSparkSqlWriterInternal {
}
// Issue the delete.
- val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema.toString
+ val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schemaStr, path, tblName,
(parameters -
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b48e4f4cb1a..33f84236faf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -50,6 +50,10 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
dir
}
+ protected def getTableStoragePath(tableName: String): String = {
+ new File(sparkWareHouse, tableName).getCanonicalPath
+ }
+
// NOTE: We need to set "spark.testing" property to make sure Spark can
appropriately
// recognize environment as testing
System.setProperty("spark.testing", "true")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
index bdaf51e9bd2..7ef473e16ca 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi.ddl
+import org.apache.avro.Schema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.model.{HoodieCleanMetadata,
HoodieCleanPartitionMetadata}
import org.apache.hudi.common.model.{HoodieCleaningPolicy,
HoodieCommitMetadata}
@@ -26,15 +27,31 @@ import org.apache.hudi.config.{HoodieCleanConfig,
HoodieWriteConfig}
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
-
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCleanMetadata
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertTrue
+import scala.collection.JavaConverters._
+
class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
+ private def ensureLastCommitIncludesProperSchema(path: String,
expectedSchema: Seq[String]): Unit = {
+ val metaClient = createMetaClient(spark, path)
+ // A bit weird way to extract schema, but there is no way to get it
exactly as is, since once `includeMetadataFields`
+ // is used - it will use custom logic to forcefully add/remove fields.
+ // And available public methods does not allow to specify exact instant to
get schema from, only latest after some filtering
+ // which may lead to false positives in test scenarios.
+ val lastInstant =
metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant().get()
+ val commitMetadata =
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastInstant).get(),
classOf[HoodieCommitMetadata])
+ val schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)
+ val schema = new Schema.Parser().parse(schemaStr)
+ val fields = schema.getFields.asScala.map(_.name())
+
+ assert(expectedSchema == fields, s"Commit metadata should include no meta
fields, received $fields")
+ }
+
test("Drop non-partitioned table") {
val tableName = generateTableName
// create table
@@ -624,6 +641,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
+ val schemaFields = Seq("id", "name", "price", "ts",
"partition_date_col")
spark.sql(
s"""
|create table $tableName (
@@ -648,6 +666,9 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
Seq("partition_date_col=2023-09-01")
)
spark.sql(s"alter table $tableName drop
partition(partition_date_col='2023-08-*')")
+ // Since incremental query utilizes a bit different schema read
scenario, if `replacecommit` is written with
+ // META fields, read fails because of duplicated META columns.
+
ensureLastCommitIncludesProperSchema(s"${tmp.getCanonicalPath}/$tableName",
schemaFields)
// show partitions will still return all partitions for tests, use
select distinct as a stop-gap
checkAnswer(s"select distinct partition_date_col from $tableName")(
Seq("2023-09-01")