This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7061652e1bd [HUDI-6579] Adding support for upsert and deletes with 
spark datasource for pk less table (#9261)
7061652e1bd is described below

commit 7061652e1bda92037ae796edae3d18fb6ec64529
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Aug 5 23:22:46 2023 -0400

    [HUDI-6579] Adding support for upsert and deletes with spark datasource for 
pk less table (#9261)
    
    Adding support for upsert and deletes with spark datasource for pk less 
table.
---
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |   2 +-
 .../apache/hudi/AutoRecordKeyGenerationUtils.scala |   6 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  12 +--
 .../org/apache/hudi/HoodieCreateRecordUtils.scala  | 113 ++++++++-------------
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  57 ++++++-----
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  34 ++++++-
 .../TestAutoGenerationOfRecordKeys.scala           |  82 ++++++++++-----
 7 files changed, 175 insertions(+), 131 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index ad87b5287ca..2078896987d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -114,7 +114,7 @@ public class HoodieMetadataWriteUtils {
         // we will trigger archive manually, to ensure only regular writer 
invokes it
         .withArchivalConfig(HoodieArchivalConfig.newBuilder()
             .archiveCommitsWith(
-                writeConfig.getMinCommitsToKeep(), 
writeConfig.getMaxCommitsToKeep())
+                writeConfig.getMinCommitsToKeep() + 1, 
writeConfig.getMaxCommitsToKeep() + 1)
             .withAutoArchive(false)
             .build())
         // we will trigger compaction manually, to control the instant times
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
index ca679acc799..501c563a989 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
@@ -32,7 +32,7 @@ object AutoRecordKeyGenerationUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
   def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String, 
String], hoodieConfig: HoodieConfig): Unit = {
-    val autoGenerateRecordKeys = 
!parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if 
record key is not configured,
+    val autoGenerateRecordKeys = isAutoGenerateRecordKeys(parameters)
     // hudi will auto generate.
     if (autoGenerateRecordKeys) {
       // de-dup is not supported with auto generation of record keys
@@ -54,4 +54,8 @@ object AutoRecordKeyGenerationUtils {
       log.warn("Precombine field " + 
hoodieConfig.getString(PRECOMBINE_FIELD.key()) + " will be ignored with auto 
record key generation enabled")
     }
   }
+
+  def isAutoGenerateRecordKeys(parameters: Map[String, String]): Boolean = {
+    !parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if 
record key is not configured,
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0f159885d89..3e5cf351ba1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
SPARK_SQL_WRITES_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, RECORDKEY_FIELD, SPARK_SQL_WRITES_PREPPED_KEY, 
STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, 
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
+import 
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
WRITE_CONCURRENCY_MODE}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.util.PathUtils
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
@@ -143,13 +143,7 @@ class DefaultSource extends RelationProvider
   override def createRelation(sqlContext: SQLContext,
                               mode: SaveMode,
                               optParams: Map[String, String],
-                              rawDf: DataFrame): BaseRelation = {
-    val df = if (optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, 
"false").toBoolean || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
"false").toBoolean) {
-      rawDf // Don't remove meta columns for prepped write.
-    } else {
-      rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
-    }
-
+                              df: DataFrame): BaseRelation = {
     if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
       HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, df)
       HoodieSparkSqlWriter.cleanup()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index d59edc64bf8..b7d9429331e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -56,8 +56,9 @@ object HoodieCreateRecordUtils {
                                        dataFileSchema: Schema,
                                        operation: WriteOperationType,
                                        instantTime: String,
-                                       isPrepped: Boolean,
-                                       sqlMergeIntoPrepped: Boolean)
+                                       preppedSparkSqlWrites: Boolean,
+                                       preppedSparkSqlMergeInto: Boolean,
+                                       preppedWriteOperation: Boolean)
 
   def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = {
     val df = args.df
@@ -69,27 +70,35 @@ object HoodieCreateRecordUtils {
     val dataFileSchema = args.dataFileSchema
     val operation = args.operation
     val instantTime = args.instantTime
-    val isPrepped = args.isPrepped
-    val sqlMergeIntoPrepped = args.sqlMergeIntoPrepped
+    val preppedSparkSqlWrites = args.preppedSparkSqlWrites
+    val preppedSparkSqlMergeInto = args.preppedSparkSqlMergeInto
+    val preppedWriteOperation = args.preppedWriteOperation
 
     val shouldDropPartitionColumns = 
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
     val recordType = config.getRecordMerger.getRecordType
     val autoGenerateRecordKeys: Boolean = 
!parameters.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
 
-    val shouldCombine = if (!isPrepped && 
WriteOperationType.isInsert(operation)) {
-      parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+    var shouldCombine = false
+    if (preppedWriteOperation && !preppedSparkSqlWrites && 
!preppedSparkSqlMergeInto) {// prepped pk less via spark-ds
+      shouldCombine = false
+    } else {
+      shouldCombine = if (!preppedSparkSqlWrites && 
WriteOperationType.isInsert(operation)) {
+        parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+          parameters.getOrElse(
+            HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+            HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()
+          ).toBoolean
+      } else if (!preppedSparkSqlWrites && 
WriteOperationType.isUpsert(operation)) {
         parameters.getOrElse(
-          HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
-          HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()
+          HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(),
+          HoodieWriteConfig.COMBINE_BEFORE_UPSERT.defaultValue()
         ).toBoolean
-    } else if (!isPrepped && WriteOperationType.isUpsert(operation)) {
-      parameters.getOrElse(
-        HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(),
-        HoodieWriteConfig.COMBINE_BEFORE_UPSERT.defaultValue()
-      ).toBoolean
-    } else {
-      !isPrepped
+      } else {
+        !preppedSparkSqlWrites
+      }
     }
+    // we can skip key generator for prepped flow
+    val usePreppedInsteadOfKeyGen = preppedSparkSqlWrites && 
preppedWriteOperation
 
     // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native 
serialization framework
     //       (due to containing cyclic refs), therefore we have to convert it 
to string before
@@ -111,24 +120,17 @@ object HoodieCreateRecordUtils {
             
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(sparkPartitionId))
             
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime)
           }
-          val keyGenerator : Option[BaseKeyGenerator] = if (isPrepped) None 
else 
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[BaseKeyGenerator])
+          val keyGenerator : Option[BaseKeyGenerator] = if 
(usePreppedInsteadOfKeyGen) None else 
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[BaseKeyGenerator])
           val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
           val consistentLogicalTimestampEnabled = parameters.getOrElse(
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
 
           // handle dropping partition columns
-          var validatePreppedRecord = true
           it.map { avroRec =>
-            if (validatePreppedRecord && isPrepped) {
-              // For prepped records, check the first record to make sure it 
has meta fields set.
-              validateMetaFieldsInAvroRecords(avroRec)
-              validatePreppedRecord = false
-            }
-
             val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
 avroRec,
-              isPrepped, sqlMergeIntoPrepped)
-            val avroRecWithoutMeta: GenericRecord = if (isPrepped || 
sqlMergeIntoPrepped) {
+              preppedSparkSqlWrites || preppedWriteOperation, 
preppedSparkSqlWrites || preppedWriteOperation || preppedSparkSqlMergeInto)
+            val avroRecWithoutMeta: GenericRecord = if (preppedSparkSqlWrites 
|| preppedSparkSqlMergeInto || preppedWriteOperation) {
               HoodieAvroUtils.rewriteRecord(avroRec, 
HoodieAvroUtils.removeMetadataFields(dataFileSchema))
             } else {
               avroRec
@@ -166,9 +168,9 @@ object HoodieCreateRecordUtils {
             
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(sparkPartitionId))
             
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime)
           }
-          val sparkKeyGenerator : Option[SparkKeyGeneratorInterface] = if 
(isPrepped) None else 
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface])
+          val sparkKeyGenerator : Option[SparkKeyGeneratorInterface] = if 
(usePreppedInsteadOfKeyGen) None else 
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface])
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
-          val finalStructType = if (isPrepped) {
+          val finalStructType = if (preppedSparkSqlWrites || 
preppedWriteOperation) {
             val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
             StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
           } else {
@@ -177,17 +179,12 @@ object HoodieCreateRecordUtils {
           // NOTE: To make sure we properly transform records
           val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
-          var validatePreppedRecord = true
           it.map { sourceRow =>
-            if (isPrepped) {
-              // For prepped records, check the record schema to make sure it 
has meta fields set.
-              validateMetaFieldsInSparkRecords(sourceStructType)
-              validatePreppedRecord = false
-            }
-            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
= 
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
 sourceRow, sourceStructType, isPrepped, sqlMergeIntoPrepped)
-
+            val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) 
=
+              
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
 sourceRow, sourceStructType,
+                preppedSparkSqlWrites || preppedWriteOperation, 
preppedSparkSqlWrites || preppedWriteOperation || preppedSparkSqlMergeInto)
             val targetRow = finalStructTypeRowWriter(sourceRow)
-            var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, 
dataFileStructType, false)
+            val hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, 
dataFileStructType, false)
             if (recordLocation.isDefined) {
               hoodieSparkRecord.setCurrentLocation(recordLocation.get)
             }
@@ -197,52 +194,31 @@ object HoodieCreateRecordUtils {
     }
   }
 
-  private def validateMetaFieldsInSparkRecords(schema: StructType): Boolean = {
-    if (!schema.fieldNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-      || !schema.fieldNames.contains(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)
-      || !schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-      || 
!schema.fieldNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
-      || !schema.fieldNames.contains(HoodieRecord.FILENAME_METADATA_FIELD)) {
-      throw new HoodieException("Metadata fields missing from spark prepared 
record.")
-    }
-    true
-  }
-
-  def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = {
-    val avroSchema = avroRec.getSchema;
-    if (avroSchema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) == null
-      || avroSchema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) == null
-      || avroSchema.getField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD) == null
-      || avroSchema.getField(HoodieRecord.PARTITION_PATH_METADATA_FIELD) == 
null
-      || avroSchema.getField(HoodieRecord.FILENAME_METADATA_FIELD) == null) {
-      throw new HoodieException("Metadata fields missing from avro prepared 
record.")
-    }
-  }
-
   def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: 
Option[BaseKeyGenerator], avroRec: GenericRecord,
-                                                 isPrepped: Boolean, 
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+                                                 
useMetaFieldsForRecordKeyAndPartition: Boolean, 
fetchRecordLocationFromMetaFields: Boolean):
+  (HoodieKey, Option[HoodieRecordLocation]) = {
     //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because 
the keygenerator handles
     //fetching from the meta fields if they are populated and otherwise doing 
keygen
-    val recordKey = if (isPrepped) {
+    val recordKey = if (useMetaFieldsForRecordKeyAndPartition) {
       avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
     } else {
       keyGenerator.get.getRecordKey(avroRec)
     }
 
-    val partitionPath = if (isPrepped) {
+    val partitionPath = if (useMetaFieldsForRecordKeyAndPartition) {
       avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
     } else {
       keyGenerator.get.getPartitionPath(avroRec)
     }
 
     val hoodieKey = new HoodieKey(recordKey, partitionPath)
-    val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+    val instantTime: Option[String] = if (fetchRecordLocationFromMetaFields) {
       
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString)
     }
     else {
       None
     }
-    val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+    val fileName: Option[String] = if (fetchRecordLocationFromMetaFields) {
       Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString)
     }
     else {
@@ -259,28 +235,29 @@ object HoodieCreateRecordUtils {
 
   def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: 
Option[SparkKeyGeneratorInterface],
                                                   sourceRow: InternalRow, 
schema: StructType,
-                                                  isPrepped: Boolean, 
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+                                                  
useMetaFieldsForRecordKeyAndPartition: Boolean,
+                                                  
fetchRecordLocationFromMetaFields: Boolean): (HoodieKey, 
Option[HoodieRecordLocation]) = {
     //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because 
the keygenerator handles
     //fetching from the meta fields if they are populated and otherwise doing 
keygen
-    val recordKey = if (isPrepped) {
+    val recordKey = if (useMetaFieldsForRecordKeyAndPartition) {
       sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)
     } else {
       sparkKeyGenerator.get.getRecordKey(sourceRow, schema).toString
     }
 
-    val partitionPath = if (isPrepped) {
+    val partitionPath = if (useMetaFieldsForRecordKeyAndPartition) {
       sourceRow.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
     } else {
       sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString
     }
 
-    val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+    val instantTime: Option[String] = if (fetchRecordLocationFromMetaFields) {
       Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD))
     } else {
       None
     }
 
-    val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+    val fileName: Option[String] = if (fetchRecordLocationFromMetaFields) {
       Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD))
     } else {
       None
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c7d1a4979bb..cb908f7274c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
+import org.apache.hudi.AutoRecordKeyGenerationUtils.{isAutoGenerateRecordKeys, 
mayBeValidateParamsForAutoGenerationOfRecordKeys}
 import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, 
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
 import 
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
@@ -56,7 +56,7 @@ import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileN
 import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, 
SerDeHelper}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
-import org.apache.hudi.keygen.{BaseKeyGenerator, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.metrics.Metrics
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
@@ -112,7 +112,7 @@ object HoodieSparkSqlWriter {
   def write(sqlContext: SQLContext,
             mode: SaveMode,
             optParams: Map[String, String],
-            df: DataFrame,
+            sourceDf: DataFrame,
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
             hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
             asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = 
Option.empty,
@@ -157,7 +157,11 @@ object HoodieSparkSqlWriter {
       case _ => throw new HoodieException("hoodie only support 
org.apache.spark.serializer.KryoSerializer as spark.serializer")
     }
     val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
-    val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults)
+    val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults, 
sourceDf)
+
+    val preppedSparkSqlMergeInto = 
parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
+    val preppedSparkSqlWrites = 
parameters.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean
+    val preppedWriteOperation = canDoPreppedWrites(hoodieConfig, parameters, 
operation, sourceDf)
 
     val jsc = new JavaSparkContext(sparkContext)
     if (asyncCompactionTriggerFn.isDefined) {
@@ -222,6 +226,11 @@ object HoodieSparkSqlWriter {
 
       val shouldReconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
       val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, 
tableMetaClient)
+      val df = if (preppedWriteOperation || preppedSparkSqlWrites || 
preppedSparkSqlMergeInto) {
+        sourceDf
+      } else {
+        sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS: _*)
+      }
       // NOTE: We need to make sure that upon conversion of the schemas b/w 
Catalyst's [[StructType]] and
       //       Avro's [[Schema]] we're preserving corresponding "record-name" 
and "record-namespace" that
       //       play crucial role in establishing compatibility b/w schemas
@@ -243,25 +252,18 @@ object HoodieSparkSqlWriter {
       val (writeResult, writeClient: SparkRDDWriteClient[_]) =
         operation match {
           case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
+            mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, 
hoodieConfig)
             val genericRecords = HoodieSparkUtils.createRdd(df, 
avroRecordName, avroRecordNamespace)
             // Convert to RDD[HoodieKey]
-            val isPrepped = 
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
-            val keyGenerator: Option[BaseKeyGenerator] = if (isPrepped) {
-              None
-            } else {
-              Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(hoodieConfig.getProps))
-                .asInstanceOf[BaseKeyGenerator])
-            }
-
             val hoodieKeysAndLocationsToDelete = 
genericRecords.mapPartitions(it => {
-              var validatePreppedRecord = true
+              val keyGenerator: Option[BaseKeyGenerator] = if 
(preppedSparkSqlWrites || preppedWriteOperation) {
+                None
+              } else {
+                Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(hoodieConfig.getProps))
+                  .asInstanceOf[BaseKeyGenerator])
+              }
               it.map { avroRec =>
-                if (validatePreppedRecord && isPrepped) {
-                  // For prepped records, check the first record to make sure 
it has meta fields set.
-                  
HoodieCreateRecordUtils.validateMetaFieldsInAvroRecords(avroRec)
-                  validatePreppedRecord = false
-                }
-                
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
 avroRec, isPrepped, false)
+                
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
 avroRec, preppedSparkSqlWrites || preppedWriteOperation, preppedSparkSqlWrites 
|| preppedSparkSqlMergeInto || preppedWriteOperation)
               }
             }).toJavaRDD()
 
@@ -285,7 +287,7 @@ object HoodieSparkSqlWriter {
 
             // Issue deletes
             client.startCommitWithTime(instantTime, commitActionType)
-            val writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysAndLocationsToDelete, instantTime, isPrepped)
+            val writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysAndLocationsToDelete, instantTime, preppedSparkSqlWrites || 
preppedWriteOperation)
             (writeStatuses, client)
 
           case WriteOperationType.DELETE_PARTITION =>
@@ -343,9 +345,7 @@ object HoodieSparkSqlWriter {
             }
 
             // Remove meta columns from writerSchema if isPrepped is true.
-            val isPrepped = 
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
-            val sqlMergeIntoPrepped = 
parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
-            val processedDataSchema = if (isPrepped || sqlMergeIntoPrepped) {
+            val processedDataSchema = if (preppedSparkSqlWrites || 
preppedSparkSqlMergeInto || preppedWriteOperation) {
               HoodieAvroUtils.removeMetadataFields(dataFileSchema)
             } else {
               dataFileSchema
@@ -382,7 +382,7 @@ object HoodieSparkSqlWriter {
             val hoodieRecords =
               
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
                 writeConfig, parameters, avroRecordName, avroRecordNamespace, 
writerSchema,
-                processedDataSchema, operation, instantTime, isPrepped, 
sqlMergeIntoPrepped))
+                processedDataSchema, operation, instantTime, 
preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation))
 
             val dedupedHoodieRecords =
               if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
@@ -391,7 +391,8 @@ object HoodieSparkSqlWriter {
                 hoodieRecords
               }
             client.startCommitWithTime(instantTime, commitActionType)
-            val writeResult = DataSourceUtils.doWriteOperation(client, 
dedupedHoodieRecords, instantTime, operation, isPrepped)
+            val writeResult = DataSourceUtils.doWriteOperation(client, 
dedupedHoodieRecords, instantTime, operation,
+              preppedSparkSqlWrites || preppedWriteOperation)
             (writeResult, client)
         }
 
@@ -415,7 +416,7 @@ object HoodieSparkSqlWriter {
     }
   }
 
-  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String], df: Dataset[Row]): WriteOperationType = {
     var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
     // TODO clean up
     // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
@@ -431,9 +432,9 @@ object HoodieSparkSqlWriter {
       operation = WriteOperationType.INSERT
       operation
     } else {
-      // if no record key, we should treat it as append only workload and make 
bulk_insert as operation type.
+      // if no record key, and no meta fields, we should treat it as append 
only workload and make bulk_insert as operation type.
       if 
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
-        && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+        && !paramsWithoutDefaults.containsKey(OPERATION.key()) && 
!df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
         log.warn(s"Choosing BULK_INSERT as the operation type since auto 
record key generation is applicable")
         operation = WriteOperationType.BULK_INSERT
       }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 5ee56642e31..3d043569835 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -21,14 +21,17 @@ import 
org.apache.hudi.DataSourceOptionsHelper.allAlternatives
 import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _}
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, HoodieConfig, TypedProperties}
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
+import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.SparkKeyGenUtils
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.hudi.command.{MergeIntoKeyGenerator, 
SqlKeyGenerator}
+import org.slf4j.LoggerFactory
 
 import java.util.Properties
 import scala.collection.JavaConversions.mapAsJavaMap
@@ -39,6 +42,7 @@ import scala.collection.JavaConverters._
  */
 object HoodieWriterUtils {
 
+  private val log = LoggerFactory.getLogger(getClass)
   /**
     * Add default options for unspecified write options keys.
     *
@@ -86,6 +90,34 @@ object HoodieWriterUtils {
     Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ 
DataSourceOptionsHelper.translateConfigurations(parameters)
   }
 
+  /**
+   * Determines whether writes need to take prepped path or regular 
non-prepped path.
+   * - For spark-sql writes (UPDATES, DELETES), we could use prepped flow due 
to the presences of meta fields.
+   * - For pkless tables, if incoming df has meta fields, we could use prepped 
flow.
+   * @param hoodieConfig hoodie config of interest.
+   * @param parameters raw parameters.
+   * @param operation operation type.
+   * @param df incoming dataframe
+   * @return true if prepped writes, false otherwise.
+   */
+  def canDoPreppedWrites(hoodieConfig: HoodieConfig, parameters: Map[String, 
String], operation : WriteOperationType, df: Dataset[Row]): Boolean = {
+    var isPrepped = false
+    if (AutoRecordKeyGenerationUtils.isAutoGenerateRecordKeys(parameters)
+      && parameters.getOrElse(SPARK_SQL_WRITES_PREPPED_KEY, 
"false").equals("false")
+      && parameters.getOrElse(SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
"false").equals("false")
+      && 
df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
+      // with pk less table, writes using spark-ds writer can potentially use 
the prepped path if meta fields are present in the incoming df.
+      if (operation == WriteOperationType.UPSERT) {
+        log.warn("Changing operation type to UPSERT PREPPED for pk less table 
upserts ")
+        isPrepped = true
+      } else if (operation == WriteOperationType.DELETE) {
+        log.warn("Changing operation type to DELETE PREPPED for pk less table 
deletes ")
+        isPrepped = true
+      }
+    }
+    isPrepped
+  }
+
   /**
    * Fetch params by translating alternatives if any. Do not set any default 
as this method is intended to be called
    * before validation.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
index 89a232b5f99..7a9f5b27ead 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -21,28 +21,26 @@ package org.apache.hudi.functional
 
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
-import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, 
WriteOperationType}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util
-import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.exception.{HoodieException, HoodieKeyGeneratorException}
 import org.apache.hudi.functional.CommonOptionUtils._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils, 
NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
+import org.apache.hudi.keygen.{ComplexKeyGenerator, 
NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers, 
ScalaAssertionSupport}
+import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
 import org.apache.spark.sql.{SaveMode, SparkSession, SparkSessionExtensions}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+import org.junit.jupiter.params.provider.CsvSource
 
 import java.util.function.Consumer
 import scala.collection.JavaConversions._
@@ -124,10 +122,6 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
       options = options -- 
Seq(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
     }
 
-    // add partition Id and instant time
-    options += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
-    options += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
-
     // NOTE: In this test we deliberately removing record-key configuration
     //       to validate Hudi is handling this case appropriately
     val writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
@@ -176,14 +170,14 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     assertEquals(10, recordKeys.size)
 
     // validate entire batch is present in snapshot read
-    val expectedInputDf = 
inputDF.union(inputDF2).drop("partition","rider","_hoodie_is_deleted")
-    val actualDf = readDF.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: 
_*).drop("partition","rider","_hoodie_is_deleted")
+    val expectedInputDf = inputDF.union(inputDF2).drop("partition", "rider", 
"_hoodie_is_deleted")
+    val actualDf = readDF.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: 
_*).drop("partition", "rider", "_hoodie_is_deleted")
     assertEquals(expectedInputDf.except(actualDf).count, 0)
   }
 
   @ParameterizedTest
   @CsvSource(value = Array(
-    
"hoodie.populate.meta.fields,false","hoodie.combine.before.insert,true","hoodie.datasource.write.insert.drop.duplicates,true"
+    "hoodie.populate.meta.fields,false", "hoodie.combine.before.insert,true", 
"hoodie.datasource.write.insert.drop.duplicates,true"
   ))
   def testRecordKeysAutoGenInvalidParams(configKey: String, configValue: 
String): Unit = {
     val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
@@ -192,10 +186,6 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     //       to validate Hudi is handling this case appropriately
     var opts = writeOpts -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
 
-    // add partition Id and instant time
-    opts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
-    opts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
-
     // Insert Operation
     val records = recordsToStrings(dataGen.generateInserts("000", 1)).toList
     val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
@@ -211,7 +201,6 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     assertTrue(getRootCause(e).getMessage.contains(configKey + " is not 
supported with auto generation of record keys"))
   }
 
-
   @Test
   def testRecordKeysAutoGenEnableToDisable(): Unit = {
     val (vanillaWriteOpts, readOpts) = 
getWriterReaderOpts(HoodieRecordType.AVRO)
@@ -228,11 +217,6 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
     inputDF.cache
 
-
-    // add partition Id and instant time
-    writeOpts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
-    writeOpts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
-
     //
     // Step #1: Persist first batch with auto-gen'd record-keys
     //
@@ -277,4 +261,56 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     val snapshot0 = spark.read.format("hudi").load(basePath)
     assertEquals(5, snapshot0.count())
   }
+
+  @Test
+  def testUpsertsAndDeletesWithPkLess(): Unit = {
+    val (vanillaWriteOpts, readOpts) = 
getWriterReaderOpts(HoodieRecordType.AVRO)
+
+    var options: Map[String, String] = vanillaWriteOpts ++ Map(
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[SimpleKeyGenerator].getCanonicalName)
+
+    var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 20)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.cache
+
+    inputDF.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, "insert")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+    val snapshotDf = spark.read.format("hudi").load(basePath)
+    snapshotDf.cache()
+    assertEquals(snapshotDf.count(), 20)
+
+    val updateDf = snapshotDf.limit(5).withColumn("rider", lit("rider-123456"))
+    updateDf.write.format("hudi")
+      .options(writeOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val snapshotDf1 = spark.read.format("hudi").load(basePath)
+    snapshotDf1.cache()
+
+    assertEquals(20, snapshotDf1.count())
+    assertEquals(5, snapshotDf1.filter("rider == 'rider-123456'").count())
+
+    // delete the same 5 records.
+    snapshotDf1.filter("rider == 'rider-123456'")
+      .write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, "delete")
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val snapshotDf2 = spark.read.format("hudi").load(basePath)
+    snapshotDf2.cache()
+    assertEquals(15, snapshotDf2.count())
+    assertEquals(0, snapshotDf2.filter("rider == 'rider-123456'").count())
+  }
 }


Reply via email to