This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7061652e1bd [HUDI-6579] Adding support for upsert and deletes with
spark datasource for pk less table (#9261)
7061652e1bd is described below
commit 7061652e1bda92037ae796edae3d18fb6ec64529
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Aug 5 23:22:46 2023 -0400
[HUDI-6579] Adding support for upsert and deletes with spark datasource for
pk less table (#9261)
Adding support for upsert and deletes with spark datasource for pk less
table.
---
.../hudi/metadata/HoodieMetadataWriteUtils.java | 2 +-
.../apache/hudi/AutoRecordKeyGenerationUtils.scala | 6 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 12 +--
.../org/apache/hudi/HoodieCreateRecordUtils.scala | 113 ++++++++-------------
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 57 ++++++-----
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 34 ++++++-
.../TestAutoGenerationOfRecordKeys.scala | 82 ++++++++++-----
7 files changed, 175 insertions(+), 131 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index ad87b5287ca..2078896987d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -114,7 +114,7 @@ public class HoodieMetadataWriteUtils {
// we will trigger archive manually, to ensure only regular writer
invokes it
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(
- writeConfig.getMinCommitsToKeep(),
writeConfig.getMaxCommitsToKeep())
+ writeConfig.getMinCommitsToKeep() + 1,
writeConfig.getMaxCommitsToKeep() + 1)
.withAutoArchive(false)
.build())
// we will trigger compaction manually, to control the instant times
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
index ca679acc799..501c563a989 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
@@ -32,7 +32,7 @@ object AutoRecordKeyGenerationUtils {
private val log = LoggerFactory.getLogger(getClass)
def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String,
String], hoodieConfig: HoodieConfig): Unit = {
- val autoGenerateRecordKeys =
!parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if
record key is not configured,
+ val autoGenerateRecordKeys = isAutoGenerateRecordKeys(parameters)
// hudi will auto generate.
if (autoGenerateRecordKeys) {
// de-dup is not supported with auto generation of record keys
@@ -54,4 +54,8 @@ object AutoRecordKeyGenerationUtils {
log.warn("Precombine field " +
hoodieConfig.getString(PRECOMBINE_FIELD.key()) + " will be ignored with auto
record key generation enabled")
}
}
+
+ def isAutoGenerateRecordKeys(parameters: Map[String, String]): Boolean = {
+ !parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if
record key is not configured,
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0f159885d89..3e5cf351ba1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
SPARK_SQL_WRITES_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
OPERATION, RECORDKEY_FIELD, SPARK_SQL_WRITES_PREPPED_KEY,
STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE,
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
+import
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY,
WRITE_CONCURRENCY_MODE}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
import org.apache.spark.sql.execution.streaming.{Sink, Source}
@@ -143,13 +143,7 @@ class DefaultSource extends RelationProvider
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
- rawDf: DataFrame): BaseRelation = {
- val df = if (optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY,
"false").toBoolean || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY,
"false").toBoolean) {
- rawDf // Don't remove meta columns for prepped write.
- } else {
- rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
- }
-
+ df: DataFrame): BaseRelation = {
if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, df)
HoodieSparkSqlWriter.cleanup()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index d59edc64bf8..b7d9429331e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -56,8 +56,9 @@ object HoodieCreateRecordUtils {
dataFileSchema: Schema,
operation: WriteOperationType,
instantTime: String,
- isPrepped: Boolean,
- sqlMergeIntoPrepped: Boolean)
+ preppedSparkSqlWrites: Boolean,
+ preppedSparkSqlMergeInto: Boolean,
+ preppedWriteOperation: Boolean)
def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = {
val df = args.df
@@ -69,27 +70,35 @@ object HoodieCreateRecordUtils {
val dataFileSchema = args.dataFileSchema
val operation = args.operation
val instantTime = args.instantTime
- val isPrepped = args.isPrepped
- val sqlMergeIntoPrepped = args.sqlMergeIntoPrepped
+ val preppedSparkSqlWrites = args.preppedSparkSqlWrites
+ val preppedSparkSqlMergeInto = args.preppedSparkSqlMergeInto
+ val preppedWriteOperation = args.preppedWriteOperation
val shouldDropPartitionColumns =
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
val recordType = config.getRecordMerger.getRecordType
val autoGenerateRecordKeys: Boolean =
!parameters.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
- val shouldCombine = if (!isPrepped &&
WriteOperationType.isInsert(operation)) {
- parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+ var shouldCombine = false
+ if (preppedWriteOperation && !preppedSparkSqlWrites &&
!preppedSparkSqlMergeInto) {// prepped pk less via spark-ds
+ shouldCombine = false
+ } else {
+ shouldCombine = if (!preppedSparkSqlWrites &&
WriteOperationType.isInsert(operation)) {
+ parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+ parameters.getOrElse(
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()
+ ).toBoolean
+ } else if (!preppedSparkSqlWrites &&
WriteOperationType.isUpsert(operation)) {
parameters.getOrElse(
- HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
- HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()
+ HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(),
+ HoodieWriteConfig.COMBINE_BEFORE_UPSERT.defaultValue()
).toBoolean
- } else if (!isPrepped && WriteOperationType.isUpsert(operation)) {
- parameters.getOrElse(
- HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(),
- HoodieWriteConfig.COMBINE_BEFORE_UPSERT.defaultValue()
- ).toBoolean
- } else {
- !isPrepped
+ } else {
+ !preppedSparkSqlWrites
+ }
}
+ // we can skip key generator for prepped flow
+ val usePreppedInsteadOfKeyGen = preppedSparkSqlWrites &&
preppedWriteOperation
// NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native
serialization framework
// (due to containing cyclic refs), therefore we have to convert it
to string before
@@ -111,24 +120,17 @@ object HoodieCreateRecordUtils {
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(sparkPartitionId))
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime)
}
- val keyGenerator : Option[BaseKeyGenerator] = if (isPrepped) None
else
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[BaseKeyGenerator])
+ val keyGenerator : Option[BaseKeyGenerator] = if
(usePreppedInsteadOfKeyGen) None else
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[BaseKeyGenerator])
val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
val consistentLogicalTimestampEnabled = parameters.getOrElse(
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
// handle dropping partition columns
- var validatePreppedRecord = true
it.map { avroRec =>
- if (validatePreppedRecord && isPrepped) {
- // For prepped records, check the first record to make sure it
has meta fields set.
- validateMetaFieldsInAvroRecords(avroRec)
- validatePreppedRecord = false
- }
-
val (hoodieKey: HoodieKey, recordLocation:
Option[HoodieRecordLocation]) =
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
avroRec,
- isPrepped, sqlMergeIntoPrepped)
- val avroRecWithoutMeta: GenericRecord = if (isPrepped ||
sqlMergeIntoPrepped) {
+ preppedSparkSqlWrites || preppedWriteOperation,
preppedSparkSqlWrites || preppedWriteOperation || preppedSparkSqlMergeInto)
+ val avroRecWithoutMeta: GenericRecord = if (preppedSparkSqlWrites
|| preppedSparkSqlMergeInto || preppedWriteOperation) {
HoodieAvroUtils.rewriteRecord(avroRec,
HoodieAvroUtils.removeMetadataFields(dataFileSchema))
} else {
avroRec
@@ -166,9 +168,9 @@ object HoodieCreateRecordUtils {
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(sparkPartitionId))
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime)
}
- val sparkKeyGenerator : Option[SparkKeyGeneratorInterface] = if
(isPrepped) None else
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface])
+ val sparkKeyGenerator : Option[SparkKeyGeneratorInterface] = if
(usePreppedInsteadOfKeyGen) None else
Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface])
val targetStructType = if (shouldDropPartitionColumns)
dataFileStructType else writerStructType
- val finalStructType = if (isPrepped) {
+ val finalStructType = if (preppedSparkSqlWrites ||
preppedWriteOperation) {
val fieldsToExclude =
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
StructType(targetStructType.fields.filterNot(field =>
fieldsToExclude.contains(field.name)))
} else {
@@ -177,17 +179,12 @@ object HoodieCreateRecordUtils {
// NOTE: To make sure we properly transform records
val finalStructTypeRowWriter =
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
- var validatePreppedRecord = true
it.map { sourceRow =>
- if (isPrepped) {
- // For prepped records, check the record schema to make sure it
has meta fields set.
- validateMetaFieldsInSparkRecords(sourceStructType)
- validatePreppedRecord = false
- }
- val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation])
=
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
sourceRow, sourceStructType, isPrepped, sqlMergeIntoPrepped)
-
+ val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation])
=
+
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
sourceRow, sourceStructType,
+ preppedSparkSqlWrites || preppedWriteOperation,
preppedSparkSqlWrites || preppedWriteOperation || preppedSparkSqlMergeInto)
val targetRow = finalStructTypeRowWriter(sourceRow)
- var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow,
dataFileStructType, false)
+ val hoodieSparkRecord = new HoodieSparkRecord(key, targetRow,
dataFileStructType, false)
if (recordLocation.isDefined) {
hoodieSparkRecord.setCurrentLocation(recordLocation.get)
}
@@ -197,52 +194,31 @@ object HoodieCreateRecordUtils {
}
}
- private def validateMetaFieldsInSparkRecords(schema: StructType): Boolean = {
- if (!schema.fieldNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
- || !schema.fieldNames.contains(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)
- || !schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
- ||
!schema.fieldNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
- || !schema.fieldNames.contains(HoodieRecord.FILENAME_METADATA_FIELD)) {
- throw new HoodieException("Metadata fields missing from spark prepared
record.")
- }
- true
- }
-
- def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = {
- val avroSchema = avroRec.getSchema;
- if (avroSchema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) == null
- || avroSchema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) == null
- || avroSchema.getField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD) == null
- || avroSchema.getField(HoodieRecord.PARTITION_PATH_METADATA_FIELD) ==
null
- || avroSchema.getField(HoodieRecord.FILENAME_METADATA_FIELD) == null) {
- throw new HoodieException("Metadata fields missing from avro prepared
record.")
- }
- }
-
def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator:
Option[BaseKeyGenerator], avroRec: GenericRecord,
- isPrepped: Boolean,
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+
useMetaFieldsForRecordKeyAndPartition: Boolean,
fetchRecordLocationFromMetaFields: Boolean):
+ (HoodieKey, Option[HoodieRecordLocation]) = {
//use keygen for sqlMergeIntoPrepped recordKey and partitionPath because
the keygenerator handles
//fetching from the meta fields if they are populated and otherwise doing
keygen
- val recordKey = if (isPrepped) {
+ val recordKey = if (useMetaFieldsForRecordKeyAndPartition) {
avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
} else {
keyGenerator.get.getRecordKey(avroRec)
}
- val partitionPath = if (isPrepped) {
+ val partitionPath = if (useMetaFieldsForRecordKeyAndPartition) {
avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
} else {
keyGenerator.get.getPartitionPath(avroRec)
}
val hoodieKey = new HoodieKey(recordKey, partitionPath)
- val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+ val instantTime: Option[String] = if (fetchRecordLocationFromMetaFields) {
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString)
}
else {
None
}
- val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+ val fileName: Option[String] = if (fetchRecordLocationFromMetaFields) {
Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString)
}
else {
@@ -259,28 +235,29 @@ object HoodieCreateRecordUtils {
def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator:
Option[SparkKeyGeneratorInterface],
sourceRow: InternalRow,
schema: StructType,
- isPrepped: Boolean,
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+
useMetaFieldsForRecordKeyAndPartition: Boolean,
+
fetchRecordLocationFromMetaFields: Boolean): (HoodieKey,
Option[HoodieRecordLocation]) = {
//use keygen for sqlMergeIntoPrepped recordKey and partitionPath because
the keygenerator handles
//fetching from the meta fields if they are populated and otherwise doing
keygen
- val recordKey = if (isPrepped) {
+ val recordKey = if (useMetaFieldsForRecordKeyAndPartition) {
sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)
} else {
sparkKeyGenerator.get.getRecordKey(sourceRow, schema).toString
}
- val partitionPath = if (isPrepped) {
+ val partitionPath = if (useMetaFieldsForRecordKeyAndPartition) {
sourceRow.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
} else {
sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString
}
- val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+ val instantTime: Option[String] = if (fetchRecordLocationFromMetaFields) {
Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD))
} else {
None
}
- val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
+ val fileName: Option[String] = if (fetchRecordLocationFromMetaFields) {
Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD))
} else {
None
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c7d1a4979bb..cb908f7274c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
+import org.apache.hudi.AutoRecordKeyGenerationUtils.{isAutoGenerateRecordKeys,
mayBeValidateParamsForAutoGenerationOfRecordKeys}
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType,
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
import
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
@@ -56,7 +56,7 @@ import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileN
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils,
SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
-import org.apache.hudi.keygen.{BaseKeyGenerator,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
@@ -112,7 +112,7 @@ object HoodieSparkSqlWriter {
def write(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
- df: DataFrame,
+ sourceDf: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] =
Option.empty,
@@ -157,7 +157,11 @@ object HoodieSparkSqlWriter {
case _ => throw new HoodieException("hoodie only support
org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
- val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults)
+ val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults,
sourceDf)
+
+ val preppedSparkSqlMergeInto =
parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
+ val preppedSparkSqlWrites =
parameters.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean
+ val preppedWriteOperation = canDoPreppedWrites(hoodieConfig, parameters,
operation, sourceDf)
val jsc = new JavaSparkContext(sparkContext)
if (asyncCompactionTriggerFn.isDefined) {
@@ -222,6 +226,11 @@ object HoodieSparkSqlWriter {
val shouldReconcileSchema =
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier,
tableMetaClient)
+ val df = if (preppedWriteOperation || preppedSparkSqlWrites ||
preppedSparkSqlMergeInto) {
+ sourceDf
+ } else {
+ sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS: _*)
+ }
// NOTE: We need to make sure that upon conversion of the schemas b/w
Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name"
and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
@@ -243,25 +252,18 @@ object HoodieSparkSqlWriter {
val (writeResult, writeClient: SparkRDDWriteClient[_]) =
operation match {
case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
+ mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters,
hoodieConfig)
val genericRecords = HoodieSparkUtils.createRdd(df,
avroRecordName, avroRecordNamespace)
// Convert to RDD[HoodieKey]
- val isPrepped =
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
- val keyGenerator: Option[BaseKeyGenerator] = if (isPrepped) {
- None
- } else {
- Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
- .asInstanceOf[BaseKeyGenerator])
- }
-
val hoodieKeysAndLocationsToDelete =
genericRecords.mapPartitions(it => {
- var validatePreppedRecord = true
+ val keyGenerator: Option[BaseKeyGenerator] = if
(preppedSparkSqlWrites || preppedWriteOperation) {
+ None
+ } else {
+ Some(HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
+ .asInstanceOf[BaseKeyGenerator])
+ }
it.map { avroRec =>
- if (validatePreppedRecord && isPrepped) {
- // For prepped records, check the first record to make sure
it has meta fields set.
-
HoodieCreateRecordUtils.validateMetaFieldsInAvroRecords(avroRec)
- validatePreppedRecord = false
- }
-
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
avroRec, isPrepped, false)
+
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
avroRec, preppedSparkSqlWrites || preppedWriteOperation, preppedSparkSqlWrites
|| preppedSparkSqlMergeInto || preppedWriteOperation)
}
}).toJavaRDD()
@@ -285,7 +287,7 @@ object HoodieSparkSqlWriter {
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
- val writeStatuses = DataSourceUtils.doDeleteOperation(client,
hoodieKeysAndLocationsToDelete, instantTime, isPrepped)
+ val writeStatuses = DataSourceUtils.doDeleteOperation(client,
hoodieKeysAndLocationsToDelete, instantTime, preppedSparkSqlWrites ||
preppedWriteOperation)
(writeStatuses, client)
case WriteOperationType.DELETE_PARTITION =>
@@ -343,9 +345,7 @@ object HoodieSparkSqlWriter {
}
// Remove meta columns from writerSchema if isPrepped is true.
- val isPrepped =
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
- val sqlMergeIntoPrepped =
parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
- val processedDataSchema = if (isPrepped || sqlMergeIntoPrepped) {
+ val processedDataSchema = if (preppedSparkSqlWrites ||
preppedSparkSqlMergeInto || preppedWriteOperation) {
HoodieAvroUtils.removeMetadataFields(dataFileSchema)
} else {
dataFileSchema
@@ -382,7 +382,7 @@ object HoodieSparkSqlWriter {
val hoodieRecords =
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName, avroRecordNamespace,
writerSchema,
- processedDataSchema, operation, instantTime, isPrepped,
sqlMergeIntoPrepped))
+ processedDataSchema, operation, instantTime,
preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation))
val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
@@ -391,7 +391,8 @@ object HoodieSparkSqlWriter {
hoodieRecords
}
client.startCommitWithTime(instantTime, commitActionType)
- val writeResult = DataSourceUtils.doWriteOperation(client,
dedupedHoodieRecords, instantTime, operation, isPrepped)
+ val writeResult = DataSourceUtils.doWriteOperation(client,
dedupedHoodieRecords, instantTime, operation,
+ preppedSparkSqlWrites || preppedWriteOperation)
(writeResult, client)
}
@@ -415,7 +416,7 @@ object HoodieSparkSqlWriter {
}
}
- def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String], df: Dataset[Row]): WriteOperationType = {
var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
// TODO clean up
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
@@ -431,9 +432,9 @@ object HoodieSparkSqlWriter {
operation = WriteOperationType.INSERT
operation
} else {
- // if no record key, we should treat it as append only workload and make
bulk_insert as operation type.
+ // if no record key, and no meta fields, we should treat it as append
only workload and make bulk_insert as operation type.
if
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
- && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+ && !paramsWithoutDefaults.containsKey(OPERATION.key()) &&
!df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
log.warn(s"Choosing BULK_INSERT as the operation type since auto
record key generation is applicable")
operation = WriteOperationType.BULK_INSERT
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 5ee56642e31..3d043569835 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -21,14 +21,17 @@ import
org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _}
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, HoodieConfig, TypedProperties}
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
+import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.SparkKeyGenUtils
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.hudi.command.{MergeIntoKeyGenerator,
SqlKeyGenerator}
+import org.slf4j.LoggerFactory
import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
@@ -39,6 +42,7 @@ import scala.collection.JavaConverters._
*/
object HoodieWriterUtils {
+ private val log = LoggerFactory.getLogger(getClass)
/**
* Add default options for unspecified write options keys.
*
@@ -86,6 +90,34 @@ object HoodieWriterUtils {
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++
DataSourceOptionsHelper.translateConfigurations(parameters)
}
+ /**
+ * Determines whether writes need to take prepped path or regular
non-prepped path.
+ * - For spark-sql writes (UPDATES, DELETES), we could use prepped flow due
to the presences of meta fields.
+ * - For pkless tables, if incoming df has meta fields, we could use prepped
flow.
+ * @param hoodieConfig hoodie config of interest.
+ * @param parameters raw parameters.
+ * @param operation operation type.
+ * @param df incoming dataframe
+ * @return true if prepped writes, false otherwise.
+ */
+ def canDoPreppedWrites(hoodieConfig: HoodieConfig, parameters: Map[String,
String], operation : WriteOperationType, df: Dataset[Row]): Boolean = {
+ var isPrepped = false
+ if (AutoRecordKeyGenerationUtils.isAutoGenerateRecordKeys(parameters)
+ && parameters.getOrElse(SPARK_SQL_WRITES_PREPPED_KEY,
"false").equals("false")
+ && parameters.getOrElse(SPARK_SQL_MERGE_INTO_PREPPED_KEY,
"false").equals("false")
+ &&
df.schema.fieldNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
+ // with pk less table, writes using spark-ds writer can potentially use
the prepped path if meta fields are present in the incoming df.
+ if (operation == WriteOperationType.UPSERT) {
+ log.warn("Changing operation type to UPSERT PREPPED for pk less table
upserts ")
+ isPrepped = true
+ } else if (operation == WriteOperationType.DELETE) {
+ log.warn("Changing operation type to DELETE PREPPED for pk less table
deletes ")
+ isPrepped = true
+ }
+ }
+ isPrepped
+ }
+
/**
* Fetch params by translating alternatives if any. Do not set any default
as this method is intended to be called
* before validation.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
index 89a232b5f99..7a9f5b27ead 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -21,28 +21,26 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.HoodieConversionUtils.toJavaOption
-import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType,
WriteOperationType}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util
-import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.exception.{HoodieException, HoodieKeyGeneratorException}
import org.apache.hudi.functional.CommonOptionUtils._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils,
NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
+import org.apache.hudi.keygen.{ComplexKeyGenerator,
NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers,
ScalaAssertionSupport}
+import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.{SaveMode, SparkSession, SparkSessionExtensions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+import org.junit.jupiter.params.provider.CsvSource
import java.util.function.Consumer
import scala.collection.JavaConversions._
@@ -124,10 +122,6 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
options = options --
Seq(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
}
- // add partition Id and instant time
- options += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
- options += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
-
// NOTE: In this test we deliberately removing record-key configuration
// to validate Hudi is handling this case appropriately
val writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
@@ -176,14 +170,14 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
assertEquals(10, recordKeys.size)
// validate entire batch is present in snapshot read
- val expectedInputDf =
inputDF.union(inputDF2).drop("partition","rider","_hoodie_is_deleted")
- val actualDf = readDF.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:
_*).drop("partition","rider","_hoodie_is_deleted")
+ val expectedInputDf = inputDF.union(inputDF2).drop("partition", "rider",
"_hoodie_is_deleted")
+ val actualDf = readDF.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:
_*).drop("partition", "rider", "_hoodie_is_deleted")
assertEquals(expectedInputDf.except(actualDf).count, 0)
}
@ParameterizedTest
@CsvSource(value = Array(
-
"hoodie.populate.meta.fields,false","hoodie.combine.before.insert,true","hoodie.datasource.write.insert.drop.duplicates,true"
+ "hoodie.populate.meta.fields,false", "hoodie.combine.before.insert,true",
"hoodie.datasource.write.insert.drop.duplicates,true"
))
def testRecordKeysAutoGenInvalidParams(configKey: String, configValue:
String): Unit = {
val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
@@ -192,10 +186,6 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
// to validate Hudi is handling this case appropriately
var opts = writeOpts -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
- // add partition Id and instant time
- opts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
- opts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
-
// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000", 1)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
@@ -211,7 +201,6 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
assertTrue(getRootCause(e).getMessage.contains(configKey + " is not
supported with auto generation of record keys"))
}
-
@Test
def testRecordKeysAutoGenEnableToDisable(): Unit = {
val (vanillaWriteOpts, readOpts) =
getWriterReaderOpts(HoodieRecordType.AVRO)
@@ -228,11 +217,6 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.cache
-
- // add partition Id and instant time
- writeOpts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
- writeOpts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
-
//
// Step #1: Persist first batch with auto-gen'd record-keys
//
@@ -277,4 +261,56 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
val snapshot0 = spark.read.format("hudi").load(basePath)
assertEquals(5, snapshot0.count())
}
+
+ @Test
+ def testUpsertsAndDeletesWithPkLess(): Unit = {
+ val (vanillaWriteOpts, readOpts) =
getWriterReaderOpts(HoodieRecordType.AVRO)
+
+ var options: Map[String, String] = vanillaWriteOpts ++ Map(
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[SimpleKeyGenerator].getCanonicalName)
+
+ var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 20)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.cache
+
+ inputDF.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, "insert")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ val snapshotDf = spark.read.format("hudi").load(basePath)
+ snapshotDf.cache()
+ assertEquals(snapshotDf.count(), 20)
+
+ val updateDf = snapshotDf.limit(5).withColumn("rider", lit("rider-123456"))
+ updateDf.write.format("hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val snapshotDf1 = spark.read.format("hudi").load(basePath)
+ snapshotDf1.cache()
+
+ assertEquals(20, snapshotDf1.count())
+ assertEquals(5, snapshotDf1.filter("rider == 'rider-123456'").count())
+
+ // delete the same 5 records.
+ snapshotDf1.filter("rider == 'rider-123456'")
+ .write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, "delete")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val snapshotDf2 = spark.read.format("hudi").load(basePath)
+ snapshotDf2.cache()
+ assertEquals(15, snapshotDf2.count())
+ assertEquals(0, snapshotDf2.filter("rider == 'rider-123456'").count())
+ }
}