This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5f4bcc8f434bc5646fee007732605beea4f66644
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Aug 22 23:40:08 2023 -0400

    [HUDI-6692] Don't default to bulk insert on nonpkless table if recordkey is 
omitted (#9444)
    
    - If a write to a table with a pk was missing the recordkey field in 
options it could default to bulk insert because it was using the pre-merging 
properties. Now it uses the post merging properties for the recordkey field.
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala |  2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala   | 20 ++++++++++++++++++--
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1387b3e2205..e98d72d8284 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -438,7 +438,7 @@ object HoodieSparkSqlWriter {
       operation
     } else {
       // if no record key, and no meta fields, we should treat it as append 
only workload and make bulk_insert as operation type.
-      if 
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+      if (!hoodieConfig.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key())
         && !paramsWithoutDefaults.containsKey(OPERATION.key()) && 
!df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
         log.warn(s"Choosing BULK_INSERT as the operation type since auto 
record key generation is applicable")
         operation = WriteOperationType.BULK_INSERT
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index ad443ff87a1..bb36b9cdd27 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -26,9 +26,9 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
@@ -261,6 +261,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     // this write should succeed even w/o setting any param for record key, 
partition path since table config will be re-used.
     writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
     
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+    assertLastCommitIsUpsert()
   }
 
   @Test
@@ -298,6 +299,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     // this write should succeed even w/o though we don't set key gen 
explicitly.
     writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
     
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+    assertLastCommitIsUpsert()
   }
 
   @Test
@@ -334,6 +336,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     // this write should succeed even w/o though we set key gen explicitly, 
its the default
     writeToHudi(optsWithNoRepeatedTableConfig, inputDF)
     
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
+    assertLastCommitIsUpsert()
   }
 
   private def writeToHudi(opts: Map[String, String], df: Dataset[Row]): Unit = 
{
@@ -1648,6 +1651,19 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
       }
     }
   }
+
+  def assertLastCommitIsUpsert(): Boolean = {
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(hadoopConf)
+      .build()
+    val timeline = metaClient.getActiveTimeline.getAllCommitsTimeline
+    val latestCommit = timeline.lastInstant()
+    assert(latestCommit.isPresent)
+    assert(latestCommit.get().isCompleted)
+    val metadata = TimelineUtils.getCommitMetadata(latestCommit.get(), 
timeline)
+    metadata.getOperationType.equals(WriteOperationType.UPSERT)
+  }
 }
 
 object TestCOWDataSource {

Reply via email to