This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5f4bcc8f434bc5646fee007732605beea4f66644 Author: Jon Vexler <[email protected]> AuthorDate: Tue Aug 22 23:40:08 2023 -0400 [HUDI-6692] Don't default to bulk insert on nonpkless table if recordkey is omitted (#9444) - If a write to a table with a pk was missing the recordkey field in options it could default to bulk insert because it was using the pre-merging properties. Now it uses the post merging properties for the recordkey field. --------- Co-authored-by: Jonathan Vexler <=> --- .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +- .../apache/hudi/functional/TestCOWDataSource.scala | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 1387b3e2205..e98d72d8284 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -438,7 +438,7 @@ object HoodieSparkSqlWriter { operation } else { // if no record key, and no meta fields, we should treat it as append only workload and make bulk_insert as operation type. - if (!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key()) + if (!hoodieConfig.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key()) && !paramsWithoutDefaults.containsKey(OPERATION.key()) && !df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { log.warn(s"Choosing BULK_INSERT as the operation type since auto record key generation is applicable") operation = WriteOperationType.BULK_INSERT diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index ad443ff87a1..bb36b9cdd27 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -26,9 +26,9 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType -import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} @@ -261,6 +261,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup // this write should succeed even w/o setting any param for record key, partition path since table config will be re-used. writeToHudi(optsWithNoRepeatedTableConfig, inputDF) spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count() + assertLastCommitIsUpsert() } @Test @@ -298,6 +299,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup // this write should succeed even w/o though we don't set key gen explicitly. writeToHudi(optsWithNoRepeatedTableConfig, inputDF) spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count() + assertLastCommitIsUpsert() } @Test @@ -334,6 +336,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup // this write should succeed even w/o though we set key gen explicitly, its the default writeToHudi(optsWithNoRepeatedTableConfig, inputDF) spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count() + assertLastCommitIsUpsert() } private def writeToHudi(opts: Map[String, String], df: Dataset[Row]): Unit = { @@ -1648,6 +1651,19 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } } + + def assertLastCommitIsUpsert(): Boolean = { + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(hadoopConf) + .build() + val timeline = metaClient.getActiveTimeline.getAllCommitsTimeline + val latestCommit = timeline.lastInstant() + assert(latestCommit.isPresent) + assert(latestCommit.get().isCompleted) + val metadata = TimelineUtils.getCommitMetadata(latestCommit.get(), timeline) + metadata.getOperationType.equals(WriteOperationType.UPSERT) + } } object TestCOWDataSource {
