This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f2fdf8a8052 [HUDI-6315] Feature flag for disabling prepped merge.
(#9203)
f2fdf8a8052 is described below
commit f2fdf8a805285d9f79444c3247726da8e244fbfb
Author: Amrish Lal <[email protected]>
AuthorDate: Thu Jul 20 16:22:35 2023 -0700
[HUDI-6315] Feature flag for disabling prepped merge. (#9203)
- Add user-defined feature flag for disabling prepped merge
("hoodie.spark.sql.optimized.writes.enable")
---
.../apache/hudi/config/HoodieInternalConfig.java | 8 -
.../org/apache/hudi/config/HoodieWriteConfig.java | 6 +
.../apache/hudi/index/SparkHoodieIndexFactory.java | 6 +-
.../factory/HoodieSparkKeyGeneratorFactory.java | 4 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 11 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 10 +-
.../org/apache/hudi/HoodieCreateRecordUtils.scala | 26 +--
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 23 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 1 -
.../hudi/command/DeleteHoodieTableCommand.scala | 12 +-
.../hudi/command/MergeIntoHoodieTableCommand.scala | 23 +-
.../hudi/command/UpdateHoodieTableCommand.scala | 12 +-
.../hudi-spark/src/test/java/HoodieJavaApp.java | 2 +-
.../apache/spark/sql/hudi/TestDeleteTable.scala | 10 +-
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 236 +++++++++++----------
.../TestMergeIntoTableWithNonRecordKeyField.scala | 207 +++++++++---------
.../apache/spark/sql/hudi/TestUpdateTable.scala | 11 +-
17 files changed, 322 insertions(+), 286 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
index a1c575c4f41..797df196441 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
@@ -46,14 +46,6 @@ public class HoodieInternalConfig extends HoodieConfig {
.withDocumentation("For SQL operations, if enables bulk_insert
operation, "
+ "this configure will take effect to decide overwrite whole table
or partitions specified");
- public static final ConfigProperty<Boolean> SQL_MERGE_INTO_WRITES =
ConfigProperty
- .key("hoodie.internal.sql.merge.into.writes")
- .defaultValue(false)
- .markAdvanced()
- .sinceVersion("0.14.0")
- .withDocumentation("This internal config is used by Merge Into SQL logic
only to mark such use case "
- + "and let the core components know it should handle the write
differently");
-
/**
* Returns if partition records are sorted or not.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5f594e553af..1581e21c070 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -721,6 +721,12 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of
`org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");
+ /**
+ * Config key with boolean value that indicates whether record being written
during MERGE INTO Spark SQL
+ * operation are already prepped.
+ */
+ public static final String SPARK_SQL_MERGE_INTO_PREPPED_KEY =
"_hoodie.spark.sql.merge.into.prepped";
+
private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index c908f40e4f0..eebaf0f05ba 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -21,7 +21,6 @@ package org.apache.hudi.index;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
@@ -44,9 +43,8 @@ import java.io.IOException;
*/
public final class SparkHoodieIndexFactory {
public static HoodieIndex createIndex(HoodieWriteConfig config) {
- boolean mergeIntoWrites =
config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
- HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue());
- if (mergeIntoWrites) {
+ boolean sqlMergeIntoPrepped =
config.getProps().getBoolean(HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY,
false);
+ if (sqlMergeIntoPrepped) {
return new HoodieInternalProxyIndex(config);
}
// first use index class config to create index.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index 235bd63b99f..cceaad0a785 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -45,7 +45,7 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
-import static
org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES;
+import static
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY;
import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;
@@ -80,7 +80,7 @@ public class HoodieSparkKeyGeneratorFactory {
boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props)
//Need to prevent overwriting the keygen for spark sql merge into
because we need to extract
//the recordkey from the meta cols if it exists. Sql keygen will use
pkless keygen if needed.
- && !props.getBoolean(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue());
+ && !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
try {
KeyGenerator keyGenerator = (KeyGenerator)
ReflectionUtils.loadClass(keyGeneratorClass, props);
if (autoRecordKeyGen) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 841f1088db6..6fb84932c13 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -311,9 +311,10 @@ object DataSourceWriteOptions {
.withDocumentation("The table type for the underlying data, for this
write. This can’t change between writes.")
/**
- * Config key with boolean value that indicates whether record being written
is already prepped.
+ * Config key with boolean value that indicates whether record being written
during UPDATE or DELETE Spark SQL
+ * operations are already prepped.
*/
- val DATASOURCE_WRITE_PREPPED_KEY = "_hoodie.datasource.write.prepped";
+ val SPARK_SQL_WRITES_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped";
/**
* May be derive partition path from incoming df if not explicitly set.
@@ -641,12 +642,12 @@ object DataSourceWriteOptions {
val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] =
HoodieTableConfig.DROP_PARTITION_COLUMNS
- val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty
- .key("hoodie.spark.sql.writes.optimized.enable")
+ val SPARK_SQL_OPTIMIZED_WRITES: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.spark.sql.optimized.writes.enable")
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
- .withDocumentation("Controls whether spark sql optimized update is
enabled.")
+ .withDocumentation("Controls whether spark sql prepped update, delete, and
merge are enabled.")
/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods
instead */
@Deprecated
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 1b961ba411a..8fe36ec71be 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
DATASOURCE_WRITE_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
SPARK_SQL_WRITES_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
@@ -29,9 +29,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieInternalConfig
-import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
-import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
+import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE,
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
import org.apache.spark.sql.execution.streaming.{Sink, Source}
@@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
rawDf: DataFrame): BaseRelation = {
- val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
- optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue().toString))
- .equalsIgnoreCase("true")) {
+ val df = if (optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY,
"false").toBoolean || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY,
"false").toBoolean) {
rawDf // Don't remove meta columns for prepped write.
} else {
rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index 09d6c7a106a..d59edc64bf8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -57,7 +57,7 @@ object HoodieCreateRecordUtils {
operation: WriteOperationType,
instantTime: String,
isPrepped: Boolean,
- mergeIntoWrites: Boolean)
+ sqlMergeIntoPrepped: Boolean)
def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = {
val df = args.df
@@ -70,7 +70,7 @@ object HoodieCreateRecordUtils {
val operation = args.operation
val instantTime = args.instantTime
val isPrepped = args.isPrepped
- val mergeIntoWrites = args.mergeIntoWrites
+ val sqlMergeIntoPrepped = args.sqlMergeIntoPrepped
val shouldDropPartitionColumns =
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
val recordType = config.getRecordMerger.getRecordType
@@ -127,8 +127,8 @@ object HoodieCreateRecordUtils {
}
val (hoodieKey: HoodieKey, recordLocation:
Option[HoodieRecordLocation]) =
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
avroRec,
- isPrepped, mergeIntoWrites)
- val avroRecWithoutMeta: GenericRecord = if (isPrepped ||
mergeIntoWrites) {
+ isPrepped, sqlMergeIntoPrepped)
+ val avroRecWithoutMeta: GenericRecord = if (isPrepped ||
sqlMergeIntoPrepped) {
HoodieAvroUtils.rewriteRecord(avroRec,
HoodieAvroUtils.removeMetadataFields(dataFileSchema))
} else {
avroRec
@@ -184,7 +184,7 @@ object HoodieCreateRecordUtils {
validateMetaFieldsInSparkRecords(sourceStructType)
validatePreppedRecord = false
}
- val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation])
=
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
sourceRow, sourceStructType, isPrepped, mergeIntoWrites)
+ val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation])
=
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
sourceRow, sourceStructType, isPrepped, sqlMergeIntoPrepped)
val targetRow = finalStructTypeRowWriter(sourceRow)
var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow,
dataFileStructType, false)
@@ -220,8 +220,8 @@ object HoodieCreateRecordUtils {
}
def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator:
Option[BaseKeyGenerator], avroRec: GenericRecord,
- isPrepped: Boolean,
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
- //use keygen for mergeIntoWrites recordKey and partitionPath because the
keygenerator handles
+ isPrepped: Boolean,
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+ //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because
the keygenerator handles
//fetching from the meta fields if they are populated and otherwise doing
keygen
val recordKey = if (isPrepped) {
avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
@@ -236,13 +236,13 @@ object HoodieCreateRecordUtils {
}
val hoodieKey = new HoodieKey(recordKey, partitionPath)
- val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
+ val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString)
}
else {
None
}
- val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
+ val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString)
}
else {
@@ -259,8 +259,8 @@ object HoodieCreateRecordUtils {
def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator:
Option[SparkKeyGeneratorInterface],
sourceRow: InternalRow,
schema: StructType,
- isPrepped: Boolean,
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
- //use keygen for mergeIntoWrites recordKey and partitionPath because the
keygenerator handles
+ isPrepped: Boolean,
sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+ //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because
the keygenerator handles
//fetching from the meta fields if they are populated and otherwise doing
keygen
val recordKey = if (isPrepped) {
sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)
@@ -274,13 +274,13 @@ object HoodieCreateRecordUtils {
sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString
}
- val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
+ val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD))
} else {
None
}
- val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
+ val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD))
} else {
None
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 272b55b7f20..ca1359578d1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -45,7 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient, T
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option =>
HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
-import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
+import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.HoodieIndex
@@ -91,6 +91,14 @@ object HoodieSparkSqlWriter {
ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
.defaultValue(true)
+ /**
+ * For merge into from spark-sql, we need some special handling. for eg,
schema validation should be disabled
+ * for writes from merge into. This config is used for internal purposes.
+ */
+ val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
+ ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
+ .defaultValue(false)
+
/**
* For spark streaming use-cases, holds the batch Id.
*/
@@ -250,7 +258,7 @@ object HoodieSparkSqlWriter {
case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
val genericRecords = HoodieSparkUtils.createRdd(df,
avroRecordName, avroRecordNamespace)
// Convert to RDD[HoodieKey]
- val isPrepped =
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
+ val isPrepped =
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
val keyGenerator: Option[BaseKeyGenerator] = if (isPrepped) {
None
} else {
@@ -348,10 +356,9 @@ object HoodieSparkSqlWriter {
}
// Remove meta columns from writerSchema if isPrepped is true.
- val isPrepped =
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
- val mergeIntoWrites =
parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
- SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
- val processedDataSchema = if (isPrepped || mergeIntoWrites) {
+ val isPrepped =
hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
+ val sqlMergeIntoPrepped =
parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
+ val processedDataSchema = if (isPrepped || sqlMergeIntoPrepped) {
HoodieAvroUtils.removeMetadataFields(dataFileSchema)
} else {
dataFileSchema
@@ -388,7 +395,7 @@ object HoodieSparkSqlWriter {
val hoodieRecords =
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName, avroRecordNamespace,
writerSchema,
- processedDataSchema, operation, instantTime, isPrepped,
mergeIntoWrites))
+ processedDataSchema, operation, instantTime, isPrepped,
sqlMergeIntoPrepped))
val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
@@ -453,7 +460,7 @@ object HoodieSparkSqlWriter {
// in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
val shouldCanonicalizeNullable =
opts.getOrDefault(CANONICALIZE_NULLABLE.key,
CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
- val mergeIntoWrites =
opts.getOrDefault(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
+ val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index cf8d85f704e..405e761635a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -82,7 +82,6 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
- hoodieConfig.setDefaultValue(ENABLE_OPTIMIZED_SQL_WRITES)
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++
DataSourceOptionsHelper.translateConfigurations(parameters)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index d10b3d529f5..55f2ebb8ac6 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi.command
-import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY,
ENABLE_OPTIMIZED_SQL_WRITES}
+import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_WRITES_PREPPED_KEY,
SPARK_SQL_OPTIMIZED_WRITES}
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
@@ -40,8 +40,8 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable)
extends HoodieLeafRunn
val condition = sparkAdapter.extractDeleteCondition(dft)
- val targetLogicalPlan = if
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
- , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+ val targetLogicalPlan = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+ , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
dft.table
} else {
stripMetaFieldAttributes(dft.table)
@@ -53,9 +53,9 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable)
extends HoodieLeafRunn
targetLogicalPlan
}
- val config = if
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
- , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
- buildHoodieDeleteTableConfig(catalogTable, sparkSession) +
(DATASOURCE_WRITE_PREPPED_KEY -> "true")
+ val config = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+ , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
+ buildHoodieDeleteTableConfig(catalogTable, sparkSession) +
(SPARK_SQL_WRITES_PREPPED_KEY -> "true")
} else {
buildHoodieDeleteTableConfig(catalogTable, sparkSession)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index d4c72e5bfb4..eba75c95452 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
-import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -342,7 +342,12 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
- val projectedJoinPlan = Project(tableMetaCols ++ incomingDataCols,
joinData)
+ val projectedJoinPlan = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
+ Project(tableMetaCols ++ incomingDataCols, joinData)
+ } else {
+ Project(incomingDataCols, joinData)
+ }
+
val projectedJoinOutput = projectedJoinPlan.output
val requiredAttributesMap = recordKeyAttributeToConditionExpression ++
preCombineAttributeAssociatedExpression
@@ -617,6 +622,15 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
+ val enableOptimizedMerge =
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
+ SPARK_SQL_OPTIMIZED_WRITES.defaultValue())
+
+ val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
+ classOf[MergeIntoKeyGenerator].getCanonicalName
+ } else {
+ classOf[SqlKeyGenerator].getCanonicalName
+ }
+
val overridingOpts = Map(
"path" -> path,
RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp,
@@ -625,7 +639,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
- KEYGENERATOR_CLASS_NAME.key ->
classOf[MergeIntoKeyGenerator].getCanonicalName,
+ KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME ->
tableConfig.getKeyGeneratorClassName,
HoodieSyncConfig.META_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
@@ -648,7 +662,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
RECONCILE_SCHEMA.key -> "false",
CANONICALIZE_NULLABLE.key -> "false",
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
- HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key -> "true",
+ HoodieSparkSqlWriter.SQL_MERGE_INTO_WRITES.key -> "true",
+ HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY ->
enableOptimizedMerge,
HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() ->
(!StringUtils.isNullOrEmpty(preCombineField)).toString
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 7d6d5f39bb1..e35e4939f04 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi.command
-import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY,
ENABLE_OPTIMIZED_SQL_WRITES}
+import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_WRITES_PREPPED_KEY,
SPARK_SQL_OPTIMIZED_WRITES}
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.spark.sql._
@@ -44,8 +44,8 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableC
case Assignment(attr: AttributeReference, value) => attr -> value
}
- val filteredOutput = if
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
- , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+ val filteredOutput = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+ , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
ut.table.output
} else {
removeMetaFields(ut.table.output)
@@ -63,10 +63,10 @@ case class UpdateHoodieTableCommand(ut: UpdateTable)
extends HoodieLeafRunnableC
val condition = ut.condition.getOrElse(TrueLiteral)
val filteredPlan = Filter(condition, Project(targetExprs, ut.table))
- val config = if
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
- , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+ val config = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
+ , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
// Set config to show that this is a prepped write.
- buildHoodieConfig(catalogTable) + (DATASOURCE_WRITE_PREPPED_KEY ->
"true")
+ buildHoodieConfig(catalogTable) + (SPARK_SQL_WRITES_PREPPED_KEY ->
"true")
} else {
buildHoodieConfig(catalogTable)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index 448215a9b0a..844f134ed83 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -161,7 +161,7 @@ public class HoodieJavaApp {
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
- .option(DataSourceWriteOptions.ENABLE_OPTIMIZED_SQL_WRITES().key(),
"true")
+ .option(DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().key(),
DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().defaultValue())
// This will remove any existing data at path below, and create a
.mode(SaveMode.Overwrite);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
index 6233884a63e..bc87405b9f9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
@@ -26,7 +26,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
test("Test Delete Table") {
withTempDir { tmp =>
- Seq(true, false).foreach { optimizedSqlEnabled =>
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
@@ -47,7 +47,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// test with optimized sql writes enabled / disabled.
- spark.sql(s"set
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
@@ -97,7 +97,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// test with optimized sql writes enabled.
- spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true")
+ spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
@@ -279,7 +279,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
Seq(false, true).foreach { urlencode =>
test(s"Test Delete single-partition table' partitions, urlencode:
$urlencode") {
- Seq(true, false).foreach { optimizedSqlEnabled =>
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -308,7 +308,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
|""".stripMargin)
// test with optimized sql writes enabled / disabled.
- spark.sql(s"set
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
// delete 2021-10-01 partition
if (urlencode) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 7ee3e838a2e..63adacbf129 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils, ScalaAssertionSupport}
import org.apache.spark.sql.internal.SQLConf
@@ -24,94 +25,99 @@ import org.apache.spark.sql.internal.SQLConf
class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
test("Test MergeInto Basic") {
- withRecordType()(withTempDir { tmp =>
- spark.sql("set hoodie.payload.combined.schema.validate = true")
- val tableName = generateTableName
- // Create table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = false")
+ val tableName = generateTableName
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- // First merge with a extra input field 'flag' (insert a new record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as
flag
- | ) s0
- | on s0.id = $tableName.id
- | when matched and flag = '1' then update set
- | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
- | when not matched and flag = '1' then insert *
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as
flag
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and flag = '1' then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when not matched and flag = '1' then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000)
+ )
- // Second merge (update the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 10 as price, 1001 as ts
- | ) s0
- | on s0.id = $tableName.id
- | when matched then update set
- | id = s0.id, name = s0.name, price = s0.price + $tableName.price,
ts = s0.ts
- | when not matched then insert *
+ // Second merge (update the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1001 as ts
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched then update set
+ | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
+ | when not matched then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 20.0, 1001)
- )
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 20.0, 1001)
+ )
- // the third time merge (update & insert the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select * from (
- | select 1 as id, 'a1' as name, 10 as price, 1002 as ts
- | union all
- | select 2 as id, 'a2' as name, 12 as price, 1001 as ts
- | )
- | ) s0
- | on s0.id = $tableName.id
- | when matched then update set
- | id = s0.id, name = s0.name, price = s0.price + $tableName.price,
ts = s0.ts
- | when not matched and s0.id % 2 = 0 then insert *
+ // the third time merge (update & insert the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select * from (
+ | select 1 as id, 'a1' as name, 10 as price, 1002 as ts
+ | union all
+ | select 2 as id, 'a2' as name, 12 as price, 1001 as ts
+ | )
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched then update set
+ | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
+ | when not matched and s0.id % 2 = 0 then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 30.0, 1002),
- Seq(2, "a2", 12.0, 1001)
- )
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 30.0, 1002),
+ Seq(2, "a2", 12.0, 1001)
+ )
- // the fourth merge (delete the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 12 as price, 1003 as ts
- | ) s0
- | on s0.id = $tableName.id
- | when matched and s0.id != 1 then update set
- | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
- | when matched and s0.id = 1 then delete
- | when not matched then insert *
+ // the fourth merge (delete the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and s0.id != 1 then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when matched and s0.id = 1 then delete
+ | when not matched then insert *
""".stripMargin)
- val cnt = spark.sql(s"select * from $tableName where id = 1").count()
- assertResult(0)(cnt)
- })
+ val cnt = spark.sql(s"select * from $tableName where id = 1").count()
+ assertResult(0)(cnt)
+ })
+ }
}
/**
@@ -1187,41 +1193,47 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
}
test("Test MergeInto with partial insert") {
- withRecordType()(withTempDir {tmp =>
- spark.sql("set hoodie.payload.combined.schema.validate = true")
- // Create a partitioned mor table
- val tableName = generateTableName
- spark.sql(
- s"""
- | create table $tableName (
- | id bigint,
- | name string,
- | price double,
- | dt string
- | ) using hudi
- | tblproperties (
- | type = 'mor',
- | primaryKey = 'id'
- | )
- | partitioned by(dt)
- | location '${tmp.getCanonicalPath}'
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = true")
+ // Create a partitioned mor table
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id bigint,
+ | name string,
+ | price double,
+ | dt string
+ | ) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id'
+ | )
+ | partitioned by(dt)
+ | location '${tmp.getCanonicalPath}'
""".stripMargin)
- spark.sql(s"insert into $tableName select 1, 'a1', 10, '2021-03-21'")
- spark.sql(
- s"""
- | merge into $tableName as t0
- | using (
- | select 2 as id, 'a2' as name, 10 as price, '2021-03-20' as dt
- | ) s0
- | on s0.id = t0.id
- | when not matched and s0.id % 2 = 0 then insert (id, name, dt)
- | values(s0.id, s0.name, s0.dt)
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, '2021-03-21'")
+
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+
+ spark.sql(
+ s"""
+ | merge into $tableName as t0
+ | using (
+ | select 2 as id, 'a2' as name, 10 as price, '2021-03-20' as dt
+ | ) s0
+ | on s0.id = t0.id
+ | when not matched and s0.id % 2 = 0 then insert (id, name, dt)
+ | values(s0.id, s0.name, s0.dt)
""".stripMargin)
- checkAnswer(s"select id, name, price, dt from $tableName order by id")(
- Seq(1, "a1", 10, "2021-03-21"),
- Seq(2, "a2", null, "2021-03-20")
- )
- })
+ checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+ Seq(1, "a1", 10, "2021-03-21"),
+ Seq(2, "a2", null, "2021-03-20")
+ )
+ })
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
index 69a40e868ac..dd1d00580dc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -17,127 +17,134 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.{HoodieSparkUtils, ScalaAssertionSupport}
class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase
with ScalaAssertionSupport {
test("Test Merge into extra cond") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- val tableName2 = generateTableName
- spark.sql(
- s"""
- |create table $tableName2 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName2'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName2'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(
- s"""
- |insert into $tableName values
- | (1, 'a1', 10, 100),
- | (2, 'a2', 20, 200),
- | (3, 'a3', 20, 100)
- |""".stripMargin)
- spark.sql(
- s"""
- |insert into $tableName2 values
- | (1, 'u1', 10, 999),
- | (3, 'u3', 30, 9999),
- | (4, 'u4', 40, 99999)
- |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'a1', 10, 100),
+ | (2, 'a2', 20, 200),
+ | (3, 'a3', 20, 100)
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName2 values
+ | (1, 'u1', 10, 999),
+ | (3, 'u3', 30, 9999),
+ | (4, 'u4', 40, 99999)
+ |""".stripMargin)
- spark.sql(
- s"""
- |merge into $tableName as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id
- |when matched and oldData.price = $tableName2.price then update set
oldData.name = $tableName2.name
- |
- |""".stripMargin)
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "u1", 10.0, 100),
- Seq(3, "a3", 20.0, 100),
- Seq(2, "a2", 20.0, 200)
- )
+ spark.sql(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched and oldData.price = $tableName2.price then update
set oldData.name = $tableName2.name
+ |
+ |""".stripMargin)
- val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
- "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
- "columns are allowed on tables with primary key. (illegal column(s)
used: `price`"
- } else {
- "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
- "columns are allowed on tables with primary key. (illegal column(s)
used: `price`;"
- }
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "u1", 10.0, 100),
+ Seq(3, "a3", 20.0, 100),
+ Seq(2, "a2", 20.0, 200)
+ )
- checkException(
- s"""
- |merge into $tableName as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id and oldData.price =
$tableName2.price
- |when matched then update set oldData.name = $tableName2.name
- |when not matched then insert *
- |""".stripMargin)(errorMessage)
+ val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Only simple conditions of the form `t.id = s.id` using primary key
or partition path " +
+ "columns are allowed on tables with primary key. (illegal
column(s) used: `price`"
+ } else {
+ "Only simple conditions of the form `t.id = s.id` using primary key
or partition path " +
+ "columns are allowed on tables with primary key. (illegal
column(s) used: `price`;"
+ }
- //test with multiple pks
- val tableName3 = generateTableName
- spark.sql(
- s"""
- |create table $tableName3 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName3'
- | tblproperties (
- | primaryKey ='id,name',
- | preCombineField = 'ts'
- | )
+ checkException(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id and oldData.price =
$tableName2.price
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage)
+
+ //test with multiple pks
+ val tableName3 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName3 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName3'
+ | tblproperties (
+ | primaryKey ='id,name',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
- "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found"
- } else {
- "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found;"
- }
+ val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found"
+ } else {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found;"
+ }
- checkException(
- s"""
- |merge into $tableName3 as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id
- |when matched then update set oldData.name = $tableName2.name
- |when not matched then insert *
- |""".stripMargin)(errorMessage2)
+ checkException(
+ s"""
+ |merge into $tableName3 as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage2)
+ }
}
}
test("Test pkless complex merge cond") {
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
+ spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
val tableName = generateTableName
// Create table
spark.sql(
@@ -212,6 +219,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
for (withPrecombine <- Seq(true, false)) {
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
+ spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
val tableName = generateTableName
val prekstr = if (withPrecombine) "tblproperties (preCombineField =
'ts')" else ""
@@ -264,6 +272,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
test("Test MergeInto Basic pkless") {
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
+ spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
val tableName = generateTableName
// Create table
spark.sql(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index 23d192ba099..f244167d142 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.HoodieSparkUtils.isSpark2
class TestUpdateTable extends HoodieSparkSqlTestBase {
test("Test Update Table") {
withRecordType()(withTempDir { tmp =>
- Seq(true, false).foreach { optimizedSqlEnabled =>
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
@@ -50,7 +51,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
)
// test with optimized sql writes enabled / disabled.
- spark.sql(s"set
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
// update data
spark.sql(s"update $tableName set price = 20 where id = 1")
@@ -95,7 +96,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
)
// test with optimized sql writes enabled.
- spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true")
+ spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
// update data
spark.sql(s"update $tableName set price = 20 where id = 1")
@@ -256,7 +257,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
test("Test decimal type") {
withTempDir { tmp =>
- Seq(true, false).foreach { optimizedSqlEnabled =>
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
val tableName = generateTableName
// create table
spark.sql(
@@ -283,7 +284,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
)
// test with optimized sql writes enabled / disabled.
- spark.sql(s"set
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
spark.sql(s"update $tableName set price = 22 where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(