This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 8b16794b34c [HUDI-9088] Fix unnecessary scanning of target table in
MERGE INTO on Spark (#12933)
8b16794b34c is described below
commit 8b16794b34c881675e39a97d3e56f3656f7d8b31
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Apr 21 14:02:09 2025 -0700
[HUDI-9088] Fix unnecessary scanning of target table in MERGE INTO on Spark
(#12933)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 4 +-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 2 +
.../hudi/command/MergeIntoHoodieTableCommand.scala | 81 +++++---
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 80 +++++++-
.../spark/sql/hudi/dml/TestMergeIntoTable.scala | 210 ++++++++++++---------
5 files changed, 257 insertions(+), 120 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 45134f91278..a74b3eba1f6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -29,10 +29,10 @@ import org.apache.hudi.common.util.Option
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder,
HiveSyncTool}
+import org.apache.hudi.keygen.{CustomKeyGenerator, NonpartitionedKeyGenerator,
SimpleKeyGenerator}
import org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.{getKeyGeneratorClassNameFromType,
inferKeyGeneratorTypeFromWriteConfig}
-import org.apache.hudi.keygen.{CustomKeyGenerator, NonpartitionedKeyGenerator,
SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction
@@ -668,7 +668,7 @@ object DataSourceWriteOptions {
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
- .withDocumentation("Controls whether spark sql prepped update, delete, and
merge are enabled.")
+ .withDocumentation("Controls whether spark sql prepped update and delete
are enabled.")
val OVERWRITE_MODE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.overwrite.mode")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index e1bae0dbf3c..4e67742cd4d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -32,6 +32,7 @@ import org.apache.hudi.keygen.{ComplexKeyGenerator,
CustomAvroKeyGenerator, Cust
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
@@ -48,6 +49,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import java.util.Locale
+
import scala.collection.JavaConverters._
trait ProvidesHoodieConfig extends Logging {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 2449817458d..dfbf9d3785c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi.command
-import org.apache.avro.Schema
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkSqlWriter, HoodieSparkUtils, SparkAdapterSupport}
import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_SCHEMA
@@ -30,19 +30,20 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkSqlWriter, HoodieSparkUtils, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast,
attributeEquals}
+
+import org.apache.avro.Schema
import org.apache.spark.sql._
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals,
MatchCast}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, EqualTo, Expression, Literal,
NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.plans.LeftOuter
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions,
getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
-import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference,
encodeAsBase64String, stripCasting, toStructType}
+import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String,
stripCasting, toStructType, CoercedAttributeReference}
import
org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -105,6 +106,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
private lazy val targetTableType = hoodieCatalogTable.tableTypeName
+ private lazy val isPrimaryKeylessTable =
hoodieCatalogTable.primaryKeys.isEmpty
+
/**
* Mapping of the Merge-Into-Table (MIT) command's [[targetTable]] attribute
into
* corresponding expression (involving reference from the [[sourceTable]])
from the MIT
@@ -275,11 +278,11 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
sparkSession.conf.set("spark.sql.crossJoin.enabled","true")
}
- val projectedJoinedDF: DataFrame = projectedJoinedDataset
+ val processedInputDf: DataFrame = getProcessedInputDf
// Create the write parameters
val props = buildMergeIntoConfig(hoodieCatalogTable)
// Do the upsert
- executeUpsert(projectedJoinedDF, props)
+ executeUpsert(processedInputDf, props)
// Refresh the table in the catalog
sparkSession.catalog.refreshTable(hoodieCatalogTable.table.qualifiedName)
@@ -290,9 +293,25 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
private val insertingActions: Seq[InsertAction] =
mergeInto.notMatchedActions.collect { case u: InsertAction => u}
private val deletingActions: Seq[DeleteAction] =
mergeInto.matchedActions.collect { case u: DeleteAction => u}
+ private def hasPrimaryKey(): Boolean = {
+ hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
+ }
+
/**
- * Here we're adjusting incoming (source) dataset in case its schema is
divergent from
- * the target table, to make sure it (at a bare minimum)
+ * Here we're processing the logical plan of the source table and optionally
the target
+ * table to get it prepared for writing the data into the Hudi table:
+ * <ul>
+ * <li> For a target table with record key(s) configure, the source table
+ * [[mergeInto.sourceTable]] is used.
+ * <li> For a primary keyless target table, the source table
[[mergeInto.sourceTable]]
+ * and target table [[mergeInto.targetTable]] are left-outer joined based
the on the
+ * merge condition so that the record key stored in the record key meta
column
+ * (`_hoodie_record_key`) are attached to the input records if they are
updates.
+ * </ul>
+ *
+ * After getting the initial logical plan to precess as above, we're
adjusting incoming
+ * (source) dataset in case its schema is divergent from the target table,
to make sure
+ * it contains all the required columns for MERGE INTO (at a bare minimum)
*
* <ol>
* <li>Contains "primary-key" column (as defined by target table's
config)</li>
@@ -328,29 +347,31 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- def projectedJoinedDataset: DataFrame = {
+ private def getProcessedInputDf: DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
- // We want to join the source and target tables.
- // Then we want to project the output so that we have the meta columns
from the target table
- // followed by the data columns of the source table
- val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
- val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
- val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
- // for pkless table, we need to project the meta columns
- val hasPrimaryKey =
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
- val projectedJoinPlan = if (!hasPrimaryKey ||
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false") == "true") {
+ // For pkless table, we need to project the meta columns by joining with
the target table;
+ // for a Hudi table with record key, we use the source table and rely on
Hudi's tagging
+ // to identify inserts, updates, and deletes to avoid the join
+ val inputPlan = if (!hasPrimaryKey()) {
+ // For a primary keyless target table, join the source and target tables.
+ // Then we want to project the output so that we have the meta columns
from the target table
+ // followed by the data columns of the source table
+ val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
+ val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
+ val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
Project(tableMetaCols ++ incomingDataCols, joinData)
} else {
- Project(incomingDataCols, joinData)
+ // For a target table with record key(s) configure, the source table is
used
+ mergeInto.sourceTable
}
- val projectedJoinOutput = projectedJoinPlan.output
+ val inputPlanAttributes = inputPlan.output
val requiredAttributesMap = recordKeyAttributeToConditionExpression ++
preCombineAttributeAssociatedExpression
val (existingAttributesMap, missingAttributesMap) =
requiredAttributesMap.partition {
- case (keyAttr, _) => projectedJoinOutput.exists(attr =>
resolver(keyAttr.name, attr.name))
+ case (keyAttr, _) => inputPlanAttributes.exists(attr =>
resolver(keyAttr.name, attr.name))
}
// This is to handle the situation where condition is something like
"s0.s_id = t0.id" so In the source table
@@ -362,7 +383,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// them according to aforementioned heuristic) to meet Hudi's
requirements
val additionalColumns: Seq[NamedExpression] =
missingAttributesMap.flatMap {
- case (keyAttr, sourceExpression) if !projectedJoinOutput.exists(attr
=> resolver(attr.name, keyAttr.name)) =>
+ case (keyAttr, sourceExpression) if !inputPlanAttributes.exists(attr
=> resolver(attr.name, keyAttr.name)) =>
Seq(Alias(sourceExpression, keyAttr.name)())
case _ => Seq()
@@ -372,7 +393,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// matches to that one of the target table. This is necessary b/c unlike
Spark, Avro is case-sensitive
// and therefore would fail downstream if case of corresponding columns
don't match
val existingAttributes = existingAttributesMap.map(_._1)
- val adjustedSourceTableOutput = projectedJoinOutput.map { attr =>
+ val adjustedSourceTableOutput = inputPlanAttributes.map { attr =>
existingAttributes.find(keyAttr => resolver(keyAttr.name, attr.name))
match {
// To align the casing we just rename the attribute to match that one
of the
// target table
@@ -381,7 +402,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
}
}
- val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns,
projectedJoinPlan)
+ val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns,
inputPlan)
Dataset.ofRows(sparkSession, amendedPlan)
}
@@ -575,7 +596,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// NOTE: We're relying on [[sourceDataset]] here instead of
[[mergeInto.sourceTable]],
// as it could be amended to add missing primary-key and/or
pre-combine columns.
// Please check [[sourceDataset]] scala-doc for more details
- (projectedJoinedDataset.queryExecution.analyzed.output ++
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
+ (getProcessedInputDf.queryExecution.analyzed.output ++
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
}
private def resolvesToSourceAttribute(expr: Expression): Boolean = {
@@ -618,9 +639,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
// for pkless tables, we need to enable optimized merge
- val hasPrimaryKey = tableConfig.getRecordKeyFields.isPresent
- val enableOptimizedMerge = if (!hasPrimaryKey) "true" else
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false")
- val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
+ val isPrimaryKeylessTable = !hasPrimaryKey()
+ val keyGeneratorClassName = if (isPrimaryKeylessTable) {
classOf[MergeIntoKeyGenerator].getCanonicalName
} else {
classOf[SqlKeyGenerator].getCanonicalName
@@ -659,7 +679,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
CANONICALIZE_SCHEMA.key -> "false",
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
HoodieSparkSqlWriter.SQL_MERGE_INTO_WRITES.key -> "true",
- HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY ->
enableOptimizedMerge,
+ // Only primary keyless table requires prepped keys and upsert
+ HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY ->
isPrimaryKeylessTable.toString,
HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() ->
(!StringUtils.isNullOrEmpty(preCombineField)).toString
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 33f84236faf..ddc745ee5fe 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -17,22 +17,30 @@
package org.apache.spark.sql.hudi.common
-import org.apache.hudi.HoodieSparkRecordMerger
-import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils}
+import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig, HoodieStorageConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.{FileSystemViewManager,
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient,
getSparkConfForTest}
+import org.apache.hudi.util.JFunction
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageContains
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types.StructField
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.scalactic.source
@@ -155,6 +163,13 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
assertResult(true)(hasException)
}
+ protected def validateTableSchema(tableName: String,
+ expectedStructFields: List[StructField]):
Unit = {
+ assertResult(expectedStructFields)(
+ spark.sql(s"select * from $tableName").schema.fields
+ .filter(e =>
!HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(e.name)))
+ }
+
def dropTypeLiteralPrefix(value: Any): Any = {
value match {
case s: String =>
@@ -205,6 +220,30 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
}
}
+ protected def withSparkSqlSessionConfig(configNameValues: (String, String)*
+ )(f: => Unit): Unit = {
+ withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e,
true)): _*)(f)
+ }
+
+ protected def withSparkSqlSessionConfigWithCondition(configNameValues:
((String, String), Boolean)*
+ )(f: => Unit): Unit = {
+ try {
+ configNameValues.foreach { case ((configName, configValue), condition) =>
+ if (condition) {
+ spark.sql(s"set $configName=$configValue")
+ }
+ }
+ f
+ } finally {
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ // Spark 3.0.x does not support "RESET configuration_key"
+ configNameValues.foreach { case ((configName, configValue), condition)
=>
+ spark.sql(s"reset $configName")
+ }
+ }
+ }
+ }
+
protected def withRecordType(recordTypes: Seq[HoodieRecordType] =
Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK),
recordConfig: Map[HoodieRecordType, Map[String,
String]]=Map.empty)(f: => Unit) {
// TODO HUDI-5264 Test parquet log with avro record in spark sql test
@@ -250,6 +289,39 @@ object HoodieSparkSqlTestBase {
.getActiveTimeline.getInstantDetails(cleanInstant).get)
}
+ def getMetaClientAndFileSystemView(basePath: String):
+ (HoodieTableMetaClient, SyncableFileSystemView) = {
+ val storageConf = HoodieTestUtils.getDefaultStorageConf
+ val metaClient: HoodieTableMetaClient =
+
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+ val metadataConfig = HoodieMetadataConfig.newBuilder.build
+ val engineContext = new HoodieLocalEngineContext(storageConf)
+ val viewManager: FileSystemViewManager =
FileSystemViewManager.createViewManager(
+ engineContext, FileSystemViewStorageConfig.newBuilder.build,
+ HoodieCommonConfig.newBuilder.build,
+ JFunction.toJavaSerializableFunctionUnchecked(
+ (_: HoodieTableMetaClient) => {
+ HoodieTableMetadata.create(
+ engineContext, metaClient.getStorage, metadataConfig,
metaClient.getBasePath.toString)
+ })
+ )
+ val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
+ (metaClient, fsView)
+ }
+
+ /**
+ * Replaces the existing file with an empty file which is meant to be
corrupted
+ * in a Hudi table.
+ *
+ * @param storage [[HoodieStorage]] instance
+ * @param filePath file path
+ */
+ def replaceWithEmptyFile(storage: HoodieStorage,
+ filePath: StoragePath): Unit = {
+ storage.deleteFile(filePath)
+ storage.createNewFile(filePath)
+ }
+
private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index 7fe9a753014..b26d42ab33f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -17,109 +17,151 @@
package org.apache.spark.sql.hudi.dml
-import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
-import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils, ScalaAssertionSupport}
-import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils, ScalaAssertionSupport}
+
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType,
StructField}
+import org.junit.jupiter.api.Assertions.assertTrue
+
+import scala.collection.JavaConverters._
class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
test("Test MergeInto Basic") {
Seq(true, false).foreach { sparkSqlOptimizedWrites =>
withRecordType()(withTempDir { tmp =>
- spark.sql("set hoodie.payload.combined.schema.validate = false")
- val tableName = generateTableName
- // Create table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ withSparkSqlSessionConfig("hoodie.payload.combined.schema.validate" ->
"false",
+ // Column stats and partition stats need to be disabled to validate
+ // that there is no full target table scan during MERGE INTO
+ "hoodie.metadata.index.column.stats.enable" -> "false",
+ "hoodie.metadata.index.partition.stats.enable" -> "false") {
+ val tableName = generateTableName
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts int,
+ | partition string
+ |) using hudi
+ | partitioned by (partition)
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- // test with optimized sql merge enabled / disabled.
- spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
- // First merge with a extra input field 'flag' (insert a new record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as
flag
- | ) s0
- | on s0.id = $tableName.id
- | when matched and flag = '1' then update set
- | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
- | when not matched and flag = '1' then insert *
+ val structFields = List(
+ StructField("id", IntegerType, nullable = true),
+ StructField("name", StringType, nullable = true),
+ StructField("price", DoubleType, nullable = true),
+ StructField("ts", IntegerType, nullable = true),
+ StructField("partition", StringType, nullable = true))
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 'p1'
as partition, '1' as flag
+ | union
+ | select 2 as id, 'a2' as name, 20 as price, 1000 as ts, 'p2'
as partition, '1' as flag
+ | union
+ | select 3 as id, 'a3' as name, 30 as price, 1000 as ts, 'p3'
as partition, '1' as flag
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and flag = '1' then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when not matched and flag = '1' then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
+ validateTableSchema(tableName, structFields)
+ checkAnswer(s"select id, name, price, ts, partition from
$tableName")(
+ Seq(1, "a1", 10.0, 1000, "p1"),
+ Seq(2, "a2", 20.0, 1000, "p2"),
+ Seq(3, "a3", 30.0, 1000, "p3")
+ )
- // Second merge (update the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 10 as price, 1001 as ts
- | ) s0
- | on s0.id = $tableName.id
- | when matched then update set
- | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
- | when not matched then insert *
- """.stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 20.0, 1001)
- )
+ // Second merge (update the record) with different field names in
the source
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as _id, 'a1' as name, 10 as _price, 1001 as _ts,
'p1' as partition
+ | ) s0
+ | on s0._id = $tableName.id
+ | when matched then update set
+ | id = s0._id, name = s0.name, price = s0._price +
$tableName.price, ts = s0._ts
+ | """.stripMargin)
+ validateTableSchema(tableName, structFields)
+ checkAnswer(s"select id, name, price, ts, partition from
$tableName")(
+ Seq(1, "a1", 20.0, 1001, "p1"),
+ Seq(2, "a2", 20.0, 1000, "p2"),
+ Seq(3, "a3", 30.0, 1000, "p3")
+ )
- // the third time merge (update & insert the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select * from (
- | select 1 as id, 'a1' as name, 10 as price, 1002 as ts
- | union all
- | select 2 as id, 'a2' as name, 12 as price, 1001 as ts
- | )
- | ) s0
- | on s0.id = $tableName.id
- | when matched then update set
- | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
- | when not matched and s0.id % 2 = 0 then insert *
+ // the third time merge (update & insert the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select * from (
+ | select 1 as id, 'a1' as name, 10 as price, 1002 as ts, 'p1'
as partition
+ | union all
+ | select 4 as id, 'a4' as name, 40 as price, 1001 as ts, 'p4'
as partition
+ | )
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched then update set
+ | id = s0.id, name = s0.name, price = s0.price +
$tableName.price, ts = s0.ts
+ | when not matched and s0.id % 2 = 0 then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 30.0, 1002),
- Seq(2, "a2", 12.0, 1001)
- )
+ validateTableSchema(tableName, structFields)
+ checkAnswer(s"select id, name, price, ts, partition from
$tableName")(
+ Seq(1, "a1", 30.0, 1002, "p1"),
+ Seq(2, "a2", 20.0, 1000, "p2"),
+ Seq(3, "a3", 30.0, 1000, "p3"),
+ Seq(4, "a4", 40.0, 1001, "p4")
+ )
- // the fourth merge (delete the record)
- spark.sql(
- s"""
- | merge into $tableName
- | using (
- | select 1 as id, 'a1' as name, 12 as price, 1003 as ts
- | ) s0
- | on s0.id = $tableName.id
- | when matched and s0.id != 1 then update set
- | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
- | when matched and s0.id = 1 then delete
- | when not matched then insert *
+ // Validate that MERGE INTO only scan affected partitions in the
target table
+ // Corrupt the files in other partitions not receiving updates
+ val (metaClient, fsv) =
HoodieSparkSqlTestBase.getMetaClientAndFileSystemView(tmp.getCanonicalPath)
+ Seq("p2", "p3", "p4").map(e => "partition=" + e).foreach(partition
=> {
+ assertTrue(fsv.getLatestFileSlices(partition).count() > 0)
+
fsv.getLatestFileSlices(partition).iterator().asScala.foreach(fileSlice => {
+ if (fileSlice.getBaseFile.isPresent) {
+ HoodieSparkSqlTestBase.replaceWithEmptyFile(
+ metaClient.getStorage,
fileSlice.getBaseFile.get.getStoragePath)
+ }
+ fileSlice.getLogFiles.iterator().asScala.foreach(logFile => {
+
HoodieSparkSqlTestBase.replaceWithEmptyFile(metaClient.getStorage,
logFile.getPath)
+ })
+ })
+ })
+ // the fourth merge (delete the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 12 as price, 1003 as ts, 'p1'
as partition
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and s0.id != 1 then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when matched and s0.id = 1 then delete
+ | when not matched then insert *
""".stripMargin)
- val cnt = spark.sql(s"select * from $tableName where id = 1").count()
- assertResult(0)(cnt)
+ val cnt = spark.sql(s"select * from $tableName where partition =
'p1'").count()
+ assertResult(0)(cnt)
+ }
})
}
}