This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3e4cebe29d6 [HUDI-8486] Enforce data type match for required columns
in Spark SQL MERGE INTO (#12798)
3e4cebe29d6 is described below
commit 3e4cebe29d6727f3e684af3ca2e0b7c18e921f69
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Thu Feb 27 06:41:58 2025 -0800
[HUDI-8486] Enforce data type match for required columns in Spark SQL MERGE
INTO (#12798)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 221 ++++--
.../src/test/resources/sql-statements.sql | 16 +-
.../TestPartitionStatsIndexWithSql.scala | 2 +-
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 59 +-
.../hudi/common/TestTableColumnTypeMismatch.scala | 816 +++++++++++++++++++++
.../apache/spark/sql/hudi/ddl/TestAlterTable.scala | 3 +-
.../spark/sql/hudi/dml/TestInsertTable.scala | 2 +-
.../sql/hudi/dml/TestMergeIntoLogOnlyTable.scala | 4 +-
.../spark/sql/hudi/dml/TestMergeIntoTable.scala | 119 +--
.../spark/sql/hudi/dml/TestMergeIntoTable2.scala | 208 +++---
.../TestMergeIntoTableWithNonRecordKeyField.scala | 6 +-
.../hudi/dml/TestMergeModeCommitTimeOrdering.scala | 3 +
.../hudi/dml/TestMergeModeEventTimeOrdering.scala | 5 +-
.../hudi/dml/TestPartialUpdateForMergeInto.scala | 30 +-
14 files changed, 1202 insertions(+), 292 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 4588bb4893e..7f01bed85cd 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -57,6 +57,19 @@ import java.util.Base64
import scala.collection.JavaConverters._
+/**
+ * Exception thrown when field resolution fails during MERGE INTO validation
+ */
+class MergeIntoFieldResolutionException(message: String)
+ extends AnalysisException(s"MERGE INTO field resolution error: $message")
+
+/**
+ * Exception thrown when field type does not match between source and target
table
+ * during MERGE INTO validation
+ */
+class MergeIntoFieldTypeMismatchException(message: String)
+ extends AnalysisException(s"MERGE INTO field type mismatch error: $message")
+
/**
* Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
*
@@ -172,7 +185,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
//
// Which (in the current design) could result in a record key
of the record being modified,
// which is not allowed.
- if (!resolvesToSourceAttribute(expr)) {
+ if (!resolvesToSourceAttribute(mergeInto.sourceTable, expr)) {
throw new AnalysisException("Only simple conditions of the form
`t.id = s.id` are allowed on the " +
s"primary-key and partition path column. Found `${attr.sql} =
${expr.sql}`")
}
@@ -241,36 +254,16 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
/**
* Please check description for [[primaryKeyAttributeToConditionExpression]]
*/
- private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute,
Expression)] = {
- val resolver = sparkSession.sessionState.analyzer.resolver
+ private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute,
Expression)] =
hoodieCatalogTable.preCombineKey.map { preCombineField =>
- val targetPreCombineAttribute =
- mergeInto.targetTable.output
- .find { attr => resolver(attr.name, preCombineField) }
- .get
-
- // To find corresponding "precombine" attribute w/in the [[sourceTable]]
we do
- // - Check if we can resolve the attribute w/in the source table as
is; if unsuccessful, then
- // - Check if in any of the update actions, right-hand side of the
assignment actually resolves
- // to it, in which case we will determine left-hand side expression
as the value of "precombine"
- // attribute w/in the [[sourceTable]]
- val sourceExpr = {
- mergeInto.sourceTable.output.find(attr => resolver(attr.name,
preCombineField)) match {
- case Some(attr) => attr
- case None =>
- updatingActions.flatMap(_.assignments).collectFirst {
- case Assignment(attr: AttributeReference, expr)
- if resolver(attr.name, preCombineField) &&
resolvesToSourceAttribute(expr) => expr
- } getOrElse {
- throw new AnalysisException(s"Failed to resolve precombine field
`${preCombineField}` w/in the source-table output")
- }
-
- }
- }
-
- (targetPreCombineAttribute, sourceExpr)
+ resolveFieldAssociationsBetweenSourceAndTarget(
+ sparkSession.sessionState.conf.resolver,
+ mergeInto.targetTable,
+ mergeInto.sourceTable,
+ Seq(preCombineField),
+ "precombine field",
+ updatingActions.flatMap(_.assignments)).head
}
- }
override def run(sparkSession: SparkSession): Seq[Row] = {
this.sparkSession = sparkSession
@@ -708,16 +701,6 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
(projectedJoinedDataset.queryExecution.analyzed.output ++
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
}
- private def resolvesToSourceAttribute(expr: Expression): Boolean = {
- val sourceTableOutputSet = mergeInto.sourceTable.outputSet
- expr match {
- case attr: AttributeReference => sourceTableOutputSet.contains(attr)
- case MatchCast(attr: AttributeReference, _, _, _) =>
sourceTableOutputSet.contains(attr)
-
- case _ => false
- }
- }
-
private def validateInsertingAssignmentExpression(expr: Expression): Unit = {
val sourceTableOutput = mergeInto.sourceTable.output
expr.collect { case br: BoundReference => br }
@@ -819,11 +802,9 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// Precombine field and record key field must be present in the assignment
clause of all insert actions for event time ordering mode.
// Check has no effect if we don't have such fields in target table or we
don't have insert actions
// Please note we are relying on merge mode in the table config as writer
merge mode is always "CUSTOM" for MIT.
- if (RecordMergeMode.EVENT_TIME_ORDERING.name()
-
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String,
Object]],
- HoodieTableConfig.RECORD_MERGE_MODE))) {
+ if (isEventTimeOrdering(props)) {
insertActions.foreach(action =>
- hoodieCatalogTable.preCombineKey.foreach(
+ hoodieCatalogTable.preCombineKey.map(
field => {
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
@@ -834,15 +815,84 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
}))
}
insertActions.foreach(action =>
- hoodieCatalogTable.preCombineKey.foreach(
- field => {
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
"record key field",
- action.assignments)
- }))
+ action.assignments))
+
+ val insertAssignments = insertActions.flatMap(_.assignments)
+ checkSchemaMergeIntoCompatibility(insertAssignments, props)
+ }
+
+ private def isEventTimeOrdering(props: Map[String, String]) = {
+ RecordMergeMode.EVENT_TIME_ORDERING.name()
+
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String,
Object]],
+ HoodieTableConfig.RECORD_MERGE_MODE))
+ }
+
+ /**
+ * Check the merge into schema compatibility between the target table and
the source table.
+ * The merge into schema compatibility requires data type matching for the
following fields:
+ * 1. Partition key
+ * 2. Primary key
+ * 3. Precombine key
+ *
+ * @param assignments the assignment clause of the insert/update statement
for figuring out
+ * the mapping between the target table and the source
table.
+ */
+ private def checkSchemaMergeIntoCompatibility(assignments: Seq[Assignment],
props: Map[String, String]): Unit = {
+ if (assignments.nonEmpty) {
+ // Assert data type matching for partition key
+ hoodieCatalogTable.partitionFields.foreach {
+ partitionField => {
+ try {
+ val association = resolveFieldAssociationsBetweenSourceAndTarget(
+ sparkSession.sessionState.conf.resolver,
+ mergeInto.targetTable,
+ mergeInto.sourceTable,
+ Seq(partitionField),
+ "partition key",
+ assignments).head
+ validateDataTypes(association._1, association._2, "Partition key")
+ } catch {
+ // Only catch AnalysisException from
resolveFieldAssociationsBetweenSourceAndTarget
+ case _: MergeIntoFieldResolutionException =>
+ }
+ }
+ }
+ val primaryAttributeAssociatedExpression: Array[(Attribute, Expression)]
=
+ resolveFieldAssociationsBetweenSourceAndTarget(
+ sparkSession.sessionState.conf.resolver,
+ mergeInto.targetTable,
+ mergeInto.sourceTable,
+ hoodieCatalogTable.primaryKeys,
+ "primary key",
+ assignments).toArray
+ primaryAttributeAssociatedExpression.foreach { case (attr, expr) =>
+ validateDataTypes(attr, expr, "Primary key")
+ }
+ if (isEventTimeOrdering(props)) {
+ hoodieCatalogTable.preCombineKey.map {
+ preCombineField => {
+ try {
+ val association = resolveFieldAssociationsBetweenSourceAndTarget(
+ sparkSession.sessionState.conf.resolver,
+ mergeInto.targetTable,
+ mergeInto.sourceTable,
+ Seq(preCombineField),
+ "precombine field",
+ assignments).head
+ validateDataTypes(association._1, association._2, "Precombine
field")
+ } catch {
+ // Only catch AnalysisException from
resolveFieldAssociationsBetweenSourceAndTarget
+ case _: MergeIntoFieldResolutionException =>
+ }
+ }
+ }
+ }
+ }
}
private def checkUpdatingActions(updateActions: Seq[UpdateAction], props:
Map[String, String]): Unit = {
@@ -854,6 +904,9 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
s"The number of update assignments[${update.assignments.length}] must
be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))
+ val updateAssignments = updateActions.flatMap(_.assignments)
+ checkSchemaMergeIntoCompatibility(updateAssignments, props)
+
if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
// For MOR table, the target table field cannot be the right-value in
the update action.
updateActions.foreach(update => {
@@ -924,26 +977,82 @@ object MergeIntoHoodieTableCommand {
fields: Seq[String],
fieldType: String,
assignments:
Seq[Assignment]): Unit = {
- // To find corresponding [[fieldType]] attribute w/in the [[assignments]]
we do
- // - Check if target table itself has the attribute
- // - Check if in any of the assignment actions, whose right-hand side
attribute
- // resolves to the source attribute. For example,
- // WHEN MATCHED THEN UPDATE SET targetTable.attribute = <expr>
- // the left-hand side of the assignment can be resolved to the target
fields we are
- // validating here.
fields.foreach { field =>
targetTable.output
.find(attr => resolver(attr.name, field))
- .getOrElse(throw new AnalysisException(s"Failed to resolve $fieldType
`$field` in target table"))
+ .getOrElse(throw new MergeIntoFieldResolutionException(s"Failed to
resolve $fieldType `$field` in target table"))
if (!assignments.exists {
case Assignment(attr: AttributeReference, _) if resolver(attr.name,
field) => true
case _ => false
}) {
- throw new AnalysisException(s"No matching assignment found for target
table $fieldType `$field`")
+ throw new MergeIntoFieldResolutionException(s"No matching assignment
found for target table $fieldType `$field`")
}
}
}
+
+ /**
+ * Generic method to resolve field associations between target and source
tables
+ *
+ * @param resolver The resolver to use
+ * @param targetTable The target table of the merge
+ * @param sourceTable The source table of the merge
+ * @param fields The fields from the target table whose association with the
source to be resolved
+ * @param fieldType String describing the type of field (for error messages)
+ * @param assignments The assignments clause of the merge into used for
resolving the association
+ * @return Sequence of resolved (target table attribute, source table
expression)
+ * mapping for target [[fields]].
+ *
+ * @throws AnalysisException if a field cannot be resolved
+ */
+ def resolveFieldAssociationsBetweenSourceAndTarget(resolver: Resolver,
+ targetTable: LogicalPlan,
+ sourceTable: LogicalPlan,
+ fields: Seq[String],
+ fieldType: String,
+ assignments:
Seq[Assignment]
+ ): Seq[(Attribute, Expression)] = {
+ fields.map { field =>
+ val targetAttribute = targetTable.output
+ .find(attr => resolver(attr.name, field))
+ .getOrElse(throw new MergeIntoFieldResolutionException(
+ s"Failed to resolve $fieldType `$field` in target table"))
+
+ val sourceExpr = sourceTable.output
+ .find(attr => resolver(attr.name, field))
+ .getOrElse {
+ assignments.collectFirst {
+ case Assignment(attr: AttributeReference, expr)
+ if resolver(attr.name, field) &&
resolvesToSourceAttribute(sourceTable, expr) => expr
+ }.getOrElse {
+ throw new MergeIntoFieldResolutionException(
+ s"Failed to resolve $fieldType `$field` w/in the source-table
output")
+ }
+ }
+
+ (targetAttribute, sourceExpr)
+ }
+ }
+
+ def resolvesToSourceAttribute(sourceTable: LogicalPlan, expr: Expression):
Boolean = {
+ val sourceTableOutputSet = sourceTable.outputSet
+ expr match {
+ case attr: AttributeReference => sourceTableOutputSet.contains(attr)
+ case MatchCast(attr: AttributeReference, _, _, _) =>
sourceTableOutputSet.contains(attr)
+
+ case _ => false
+ }
+ }
+
+ def validateDataTypes(attr: Attribute, expr: Expression, columnType:
String): Unit = {
+ if (attr.dataType != expr.dataType) {
+ throw new MergeIntoFieldTypeMismatchException(
+ s"$columnType data type mismatch between source table and target
table. " +
+ s"Target table uses ${attr.dataType} for column '${attr.name}', " +
+ s"source table uses ${expr.dataType} for '${expr.sql}'"
+ )
+ }
+ }
}
object PartialAssignmentMode extends Enumeration {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
index 29fbb0ba745..4aacb206a59 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
# CREATE TABLE
create table h1 (
- id bigint,
+ id int,
name string,
price double,
- ts bigint
+ ts int
) using hudi
options (
type = '${tableType}',
@@ -79,10 +79,10 @@ location '${tmpDir}/h1';
+----------+
create table h1_p (
- id bigint,
+ id int,
name string,
price double,
- ts bigint,
+ ts int,
dt string
) using hudi
partitioned by (dt)
@@ -205,7 +205,7 @@ select id, name, price, ts, dt from h1_p order by id;
merge into h1_p t0
using (
- select 5 as _id, 'a5' as _name, 10 as _price, 1000L as _ts, '2021-05-08' as
dt
+ select 5 as _id, 'a5' as _name, 10 as _price, 1000 as _ts, '2021-05-08' as dt
) s0
on s0._id = t0.id
when matched then update set id = _id, name = _name, price = _price, ts = _ts,
dt = s0.dt
@@ -224,11 +224,11 @@ select id, name, price, ts, dt from h1_p order by id;
merge into h1_p t0
using (
- select 1 as id, '_delete' as name, 10 as price, 1000L as ts, '2021-05-07' as
dt
+ select 1 as id, '_delete' as name, 10 as price, 1000 as ts, '2021-05-07' as
dt
union
- select 2 as id, '_update' as name, 12 as price, 1001L as ts, '2021-05-07' as
dt
+ select 2 as id, '_update' as name, 12 as price, 1001 as ts, '2021-05-07' as
dt
union
- select 6 as id, '_insert' as name, 10 as price, 1000L as ts, '2021-05-08' as
dt
+ select 6 as id, '_insert' as name, 10 as price, 1000 as ts, '2021-05-08' as
dt
) s0
on s0.id = t0.id
when matched and s0.name = '_update'
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 4436c907bac..8581af14234 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -161,7 +161,7 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
spark.sql(
s"""
|merge into $tableName h0
- |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts,
'$partitionValue' as dt) s0
+ |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts,
cast('$partitionValue' as Date) as dt) s0
|on h0.id = s0.id
|when matched then update set *
|""".stripMargin)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index d29b1c24c1e..74826e9cfaa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -19,15 +19,21 @@ package org.apache.spark.sql.hudi.common
import org.apache.hudi.{DefaultSparkRecordMerger, HoodieSparkUtils}
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
-import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
+import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig, HoodieStorageConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger,
HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.log.HoodieLogFileReader
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.view.{FileSystemViewManager,
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient,
getSparkConfForTest}
@@ -39,13 +45,13 @@ import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.slf4j.LoggerFactory
import java.io.File
-import java.util.TimeZone
+import java.util.{Collections, Optional, TimeZone}
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
@@ -386,6 +392,49 @@ object HoodieSparkSqlTestBase {
.getActiveTimeline.getInstantDetails(cleanInstant).get)
}
+ def getMetaClientAndFileSystemView(basePath: String):
+ (HoodieTableMetaClient, SyncableFileSystemView) = {
+ val storageConf = HoodieTestUtils.getDefaultStorageConf
+ val metaClient: HoodieTableMetaClient =
+
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+ val metadataConfig = HoodieMetadataConfig.newBuilder.build
+ val engineContext = new HoodieLocalEngineContext(storageConf)
+ val viewManager: FileSystemViewManager =
FileSystemViewManager.createViewManager(
+ engineContext, metadataConfig,
FileSystemViewStorageConfig.newBuilder.build,
+ HoodieCommonConfig.newBuilder.build,
+ (_: HoodieTableMetaClient) => {
+ HoodieTableMetadata.create(
+ engineContext, metaClient.getStorage, metadataConfig,
metaClient.getBasePath.toString)
+ }
+ )
+ val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
+ (metaClient, fsView)
+ }
+
+ def validateDeleteLogBlockPrecombineNullOrZero(basePath: String): Unit = {
+ val (metaClient, fsView) = getMetaClientAndFileSystemView(basePath)
+ val fileSlice: Optional[FileSlice] =
fsView.getAllFileSlices("").findFirst()
+ assertTrue(fileSlice.isPresent)
+ val logFilePathList: java.util.List[String] =
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get)
+ Collections.sort(logFilePathList)
+ var deleteLogBlockFound = false
+ val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+ for (i <- 0 until logFilePathList.size()) {
+ val logReader = new HoodieLogFileReader(
+ metaClient.getStorage, new HoodieLogFile(logFilePathList.get(i)),
+ avroSchema, 1024 * 1024, false, false,
+ "id", null)
+ assertTrue(logReader.hasNext)
+ val logBlock = logReader.next()
+ if (logBlock.isInstanceOf[HoodieDeleteBlock]) {
+ val deleteLogBlock = logBlock.asInstanceOf[HoodieDeleteBlock]
+ assertTrue(deleteLogBlock.getRecordsToDelete.forall(i =>
i.getOrderingValue() == 0 || i.getOrderingValue() == null))
+ deleteLogBlockFound = true
+ }
+ }
+ assertTrue(deleteLogBlockFound)
+ }
+
def validateTableConfig(storage: HoodieStorage,
basePath: String,
expectedConfigs: Map[String, String],
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestTableColumnTypeMismatch.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestTableColumnTypeMismatch.scala
new file mode 100644
index 00000000000..b9ecf2494ce
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestTableColumnTypeMismatch.scala
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.{DataSourceWriteOptions, ScalaAssertionSupport}
+
+import org.apache.spark.sql.AnalysisException
+import
org.apache.spark.sql.hudi.ErrorMessageChecker.isIncompatibleDataException
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestTableColumnTypeMismatch extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
+
+ test("Test Spark successful implicit type casting behaviors") {
+ withTempDir { tmp =>
+ // Define test cases for successful implicit casting
+ case class TypeCastTestCase(
+ sourceType: String,
+ targetType: String,
+ testValue: String,
+ expectedValue: Any,
+ description: String
+ )
+
+ val successfulTestCases = Seq(
+ // Numeric widening conversions (always safe)
+ TypeCastTestCase("tinyint", "smallint", "127", 127, "tinyint to
smallint widening"),
+ TypeCastTestCase("tinyint", "int", "127", 127, "tinyint to int
widening"),
+ TypeCastTestCase("tinyint", "bigint", "127", 127L, "tinyint to bigint
widening"),
+ TypeCastTestCase("tinyint", "float", "127", 127.0f, "tinyint to float
widening"),
+ TypeCastTestCase("tinyint", "double", "127", 127.0d, "tinyint to
double widening"),
+ TypeCastTestCase("tinyint", "decimal(10,1)", "127",
java.math.BigDecimal.valueOf(127.0), "tinyint to decimal widening"),
+
+ TypeCastTestCase("smallint", "int", "32767", 32767, "smallint to int
widening"),
+ TypeCastTestCase("smallint", "bigint", "32767", 32767L, "smallint to
bigint widening"),
+ TypeCastTestCase("smallint", "float", "32767", 32767.0f, "smallint to
float widening"),
+ TypeCastTestCase("smallint", "double", "32767", 32767.0d, "smallint to
double widening"),
+ TypeCastTestCase("smallint", "decimal(10,1)", "32767",
java.math.BigDecimal.valueOf(32767.0), "smallint to decimal widening"),
+
+ TypeCastTestCase("int", "bigint", "2147483647", 2147483647L, "int to
bigint widening"),
+ TypeCastTestCase("int", "float", "2147483647", 2147483647.0f, "int to
float widening"),
+ TypeCastTestCase("int", "double", "2147483647", 2147483647.0d, "int to
double widening"),
+ TypeCastTestCase("int", "decimal(10,1)", "22",
java.math.BigDecimal.valueOf(22.0), "int to decimal widening"),
+
+ // double value would have some epsilon error which is expected.
+ TypeCastTestCase("float", "double", "3.14", 3.140000104904175d, "float
to double widening"),
+ TypeCastTestCase("float", "decimal(10,2)", "3.14",
java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP),
"float to decimal"),
+
+ // Numeric narrowing conversions (potential data loss)
+ TypeCastTestCase("double", "int", "123.45", 123, "double to int -
truncates decimal"),
+ TypeCastTestCase("decimal(10,2)", "int", "123.45", 123, "decimal to
int - truncates decimal"),
+
+ // Boolean conversions
+ TypeCastTestCase("boolean", "string", "true", "true", "boolean to
string"),
+
+ // Timestamp/Date conversions
+ TypeCastTestCase("timestamp", "string", "timestamp'2023-01-01
12:00:00'", "2023-01-01 12:00:00", "timestamp to string"),
+ TypeCastTestCase("timestamp", "date", "timestamp'2023-01-01
12:00:00'", java.sql.Date.valueOf("2023-01-01"), "timestamp to date"),
+ TypeCastTestCase("date", "string", "date'2023-01-01'", "2023-01-01",
"date to string"),
+ TypeCastTestCase("date", "timestamp", "date'2023-01-01'",
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date to timestamp")
+ )
+
+ val tableName = generateTableName
+
+ // Create columns definition dynamically
+ val columnsDefinition = successfulTestCases.zipWithIndex.map { case
(test, idx) =>
+ s"col_${idx} ${test.targetType}"
+ }.mkString(",\n ")
+
+ // Create single table with all target type columns
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | $columnsDefinition,
+ | ts long
+ |) using hudi
+ |location '${tmp.getCanonicalPath}/$tableName'
+ |tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ """.stripMargin)
+
+ // Generate insert values
+ val insertValues = successfulTestCases.zipWithIndex.map { case (test,
idx) =>
+ s"cast(${test.testValue} as ${test.sourceType}) as col_${idx}"
+ }.mkString(",\n ")
+
+ // Insert all test values in one go
+ spark.sql(
+ s"""
+ |insert into $tableName
+ |select
+ | 1 as id,
+ | $insertValues,
+ | 1000 as ts
+ """.stripMargin)
+
+ // Verify each column value
+ val result = spark.sql("select "
+ + successfulTestCases.zipWithIndex.map { case (_, idx) => s"col_$idx"
}.mkString(",\n ")
+ + s" from $tableName where id = 1").collect()(0)
+ successfulTestCases.zipWithIndex.foreach { case (test, idx) =>
+ assert(result(idx) == test.expectedValue,
+ s"${test.description}: Expected ${test.expectedValue} but got
${result(idx)}")
+ }
+ }
+ }
+
+ test("Test Spark disallowed implicit type casting behaviors") {
+ // Capturing the current behavior of Spark's implicit type casting.
+ withTempDir { tmp =>
+ // Define test cases for implicit casting
+ case class TypeCastTestCase(sourceType: String,
+ targetType: String,
+ testValue: String, // SQL literal expression
+ expectedValue: Any,
+ description: String = "")
+
+ val testCases = Seq(
+ TypeCastTestCase("int", "decimal(10,1)", "2147483647",
java.math.BigDecimal.valueOf(2147483647.0), "int to decimal widening overflow"),
+
+ // String conversions
+ TypeCastTestCase("string", "int", "'123'", 123, "string to int -
invalid numeric string"),
+ TypeCastTestCase("string", "double", "'12.34'", 12.34d, "string to
double - invalid numeric string"),
+ TypeCastTestCase("string", "double", "'abc'", null, "string to double
- invalid numeric string"),
+ TypeCastTestCase("string", "boolean", "'abc'", null, "string to
boolean - invalid boolean string"),
+ TypeCastTestCase("string", "timestamp", "'2023-01-01'",
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "string to timestamp -
invalid date string"),
+ TypeCastTestCase("string", "date", "'2023-01-01'",
java.sql.Date.valueOf("2023-01-01"), "string to date - invalid date string"),
+
+ // Numeric narrowing conversions (potential data loss)
+ TypeCastTestCase("double", "int", s"${Int.MaxValue.toDouble + 1}",
null, "double to int - overflow"),
+ TypeCastTestCase("bigint", "int", "2147483648", null, "bigint to int -
overflow"),
+
+ // Boolean conversions
+ TypeCastTestCase("boolean", "int", "true", 1, "boolean to int")
+ )
+
+ testCases.foreach { testCase =>
+ val tableName = generateTableName
+
+ // Create table with target type
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | value ${testCase.targetType},
+ | ts long
+ |) using hudi
+ |location '${tmp.getCanonicalPath}/$tableName'
+ |tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ """.stripMargin)
+
+ // Test failed conversion
+ val exception = intercept[Exception] {
+ spark.sql(
+ s"""
+ |insert into $tableName
+ |select 1 as id, cast(${testCase.testValue} as
${testCase.sourceType}) as value, 1000 as ts
+ """.stripMargin)
+ }
+
+ val exceptionMsg = exception.getMessage
+ val exceptionCauseMsg =
Option(exception.getCause).map(_.getMessage).getOrElse("")
+ assert(isIncompatibleDataException(exception),
+ s"${testCase.description}: Expected casting related error but got
different exception: " +
+ s"Message from the exception ${exceptionMsg}, message from the
exception cause ${exceptionCauseMsg}")
+ }
+ }
+ }
+
+ test("Test All Valid Type Casting For Merge Into and Insert") {
+ // For all valid type casting pairs, test merge into and insert operations.
+ // Define the column types for testing, based on successful casting cases
+ case class ColumnTypePair(sourceType: String,
+ targetType: String,
+ testValue: String,
+ expectedValue: Any,
+ columnName: String)
+
+ // Define valid type casting pairs based on the previous test cases
+ val validTypePairs = Seq(
+ // Numeric widening pairs
+ ColumnTypePair("tinyint", "smallint", "127", 127, "tiny_to_small"),
+ ColumnTypePair("tinyint", "int", "127", 127, "tiny_to_int"),
+ ColumnTypePair("tinyint", "bigint", "127", 127L, "tiny_to_big"),
+ ColumnTypePair("tinyint", "float", "127", 127.0f, "tiny_to_float"),
+ ColumnTypePair("tinyint", "double", "127", 127.0d, "tiny_to_double"),
+ ColumnTypePair("tinyint", "decimal(10,1)", "127",
java.math.BigDecimal.valueOf(127.0), "tiny_to_decimal"),
+
+ ColumnTypePair("smallint", "int", "32767", 32767, "small_to_int"),
+ ColumnTypePair("smallint", "bigint", "32767", 32767L, "small_to_big"),
+ ColumnTypePair("smallint", "float", "32767", 32767.0f, "small_to_float"),
+ ColumnTypePair("smallint", "double", "32767", 32767.0d,
"small_to_double"),
+ ColumnTypePair("smallint", "decimal(10,1)", "32767",
java.math.BigDecimal.valueOf(32767.0), "small_to_decimal"),
+
+ ColumnTypePair("int", "bigint", "2147483647", 2147483647L, "int_to_big"),
+ ColumnTypePair("int", "float", "2147483647", 2147483647.0f,
"int_to_float"),
+ ColumnTypePair("int", "double", "2147483647", 2147483647.0d,
"int_to_double"),
+ ColumnTypePair("int", "decimal(10,1)", "22",
java.math.BigDecimal.valueOf(22.0), "int_to_decimal"),
+
+ ColumnTypePair("float", "double", "3.14", 3.140000104904175d,
"float_to_double"),
+ ColumnTypePair("float", "decimal(10,2)", "3.14",
java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP),
"float_to_decimal"),
+
+ // Timestamp/Date conversions
+ ColumnTypePair("timestamp", "string", "timestamp'2023-01-01 12:00:00'",
"2023-01-01 12:00:00", "ts_to_string"),
+ ColumnTypePair("timestamp", "date", "timestamp'2023-01-01 12:00:00'",
java.sql.Date.valueOf("2023-01-01"), "ts_to_date"),
+ ColumnTypePair("date", "string", "date'2023-01-01'", "2023-01-01",
"date_to_string"),
+ ColumnTypePair("date", "timestamp", "date'2023-01-01'",
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date_to_ts"),
+
+ // Boolean conversions
+ ColumnTypePair("boolean", "string", "true", "true", "bool_to_string")
+ )
+
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val sourceTable = generateTableName
+
+ // Create column definitions for both tables
+ val targetColumns = validTypePairs.map(p => s"${p.columnName}
${p.targetType}").mkString(",\n ")
+ val sourceColumns = validTypePairs.map(p => s"${p.columnName}
${p.sourceType}").mkString(",\n ")
+
+ // Create target table.
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | id int,
+ | $targetColumns,
+ | ts long
+ |) using hudi
+ |location '${tmp.getCanonicalPath}/$targetTable'
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ """.stripMargin)
+
+ // Create source table
+ spark.sql(
+ s"""
+ |create table $sourceTable (
+ | id int,
+ | $sourceColumns,
+ | ts long
+ |) using hudi
+ |location '${tmp.getCanonicalPath}/$sourceTable'
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ """.stripMargin)
+
+ // Insert initial data into target table
+ val targetInsertValues = validTypePairs.map(_ => "null").mkString(", ")
+ spark.sql(
+ s"""
+ |insert into $targetTable
+ |select 1 as id, $targetInsertValues, 1000 as ts
+ """.stripMargin)
+
+ // Insert data into source table with test values
+ val sourceValues = validTypePairs.map(p => s"cast(${p.testValue} as
${p.sourceType})").mkString(", ")
+ spark.sql(
+ s"""
+ |insert into $sourceTable
+ |select 1 as id, $sourceValues, 1001 as ts
+ """.stripMargin)
+
+ // Perform merge operation
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using $sourceTable s
+ |on t.id = s.id
+ |when matched then update set *
+ |when not matched then insert *
+ """.stripMargin)
+
+ // Verify results
+ val c = validTypePairs.map(p => s"${p.columnName}").mkString(",\n ")
+ val result = spark.sql(s"select $c from $targetTable where id =
1").collect()(0)
+ validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
+ val actualValue = result.get(idx) // +1 because id is first column
+ assert(actualValue == pair.expectedValue,
+ s"${tableType.toUpperCase}: Column ${pair.columnName} - Expected
${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue
(${if (actualValue != null) actualValue.getClass else "null"})")
+ }
+
+ // Test insert case
+ val sourceValues2 = validTypePairs.map(p => s"cast(${p.testValue} as
${p.sourceType})").mkString(", ")
+ spark.sql(
+ s"""
+ |insert into $sourceTable
+ |select 2 as id, $sourceValues2, 1002 as ts
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using $sourceTable s
+ |on t.id = s.id
+ |when matched then update set *
+ |when not matched then insert *
+ """.stripMargin)
+ // Verify inserted row
+ val result2 = spark.sql(s"select * from $targetTable where id =
2").collect()(0)
+ validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
+ val actualValue = result2.get(idx + 1)
+ assert(actualValue != pair.expectedValue,
+ s"${tableType.toUpperCase}: Insert - Column ${pair.columnName} -
Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got
$actualValue (${if (actualValue != null) actualValue.getClass else "null"})")
+ }
+ }
+ }
+ }
+
+ test("Test Column Type Mismatches for MergeInto Delete Actions") {
+ Seq("mor").foreach { tableType =>
+ withTempDir { tmp =>
+ def createTargetTable(partitionCol: String, partitionType: String):
String = {
+ val targetTable = generateTableName
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | id long,
+ | name string,
+ | value_double double,
+ | ts long,
+ | $partitionCol $partitionType
+ |) using hudi
+ |partitioned by ($partitionCol)
+ |location '${tmp.getCanonicalPath}/$targetTable'
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ """.stripMargin)
+ targetTable
+ }
+
+ // Scenario 1: Successful merge with partition column (both partition
and pk can be cast)
+ {
+ val targetTable = createTargetTable("part_col", "long")
+
+ // Insert initial data into target table
+ spark.sql(
+ s"""
+ |insert into $targetTable
+ |select
+ | cast(id as long) as id,
+ | name,
+ | value_double,
+ | ts,
+ | cast(part_col as long) as part_col
+ |from (
+ | select 1 as id, 'record1' as name, 1.1 as value_double, 1000
as ts, 100 as part_col
+ | union all
+ | select 2 as id, 'record2' as name, 2.2 as value_double, 1000
as ts, 200 as part_col
+ |)
+ """.stripMargin)
+
+ // Merge using inline subquery instead of source table
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using (
+ | select
+ | cast(1 as int) as id,
+ | cast('updated1' as string) as name,
+ | cast(1.11 as double) as value_double,
+ | cast(1001 as long) as ts,
+ | cast(100 as int) as part_col,
+ | cast('Y' as string) as delete_flag
+ |) s
+ |on t.id = s.id and t.part_col = s.part_col
+ |when matched and s.delete_flag = 'Y' then delete
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, value_double, ts, part_col from
$targetTable order by id")(
+ Seq(2L, "record2", 2.2, 1000L, 200L))
+ }
+
+ // Scenario 2: Partition column type not cast-able
+ {
+ val targetTable = createTargetTable("part_col", "boolean")
+
+ // Insert initial data into target table with boolean partition
+ spark.sql(
+ s"""
+ |insert into $targetTable
+ |select
+ | cast(id as long) as id,
+ | name,
+ | value_double,
+ | ts,
+ | true as part_col
+ |from (
+ | select 1 as id, 'record1' as name, 1.1 as value_double, 1000
as ts
+ |)
+ """.stripMargin)
+
+ val sourceTableSubQuery =
+ s"""
+ | select
+ | cast(1 as int) as id,
+ | cast('updated1' as string) as name,
+ | cast(1.11 as double) as value_double,
+ | cast(1001 as long) as ts,
+ | cast('2024-01-01' as date) as part_col,
+ | cast('Y' as string) as delete_flag
+ |""".stripMargin
+ // Should fail with cast related error due to incompatible partition
types
+ val e1 = intercept[Exception] {
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using ($sourceTableSubQuery) s
+ |on t.id = s.id and t.part_col = s.part_col
+ |when matched and s.delete_flag = 'Y' then delete
+ """.stripMargin)
+ }
+ assert(
+ e1.getMessage.contains(
+ "the left and right operands of the binary operator have
incompatible types " +
+ "(\"BOOLEAN\" and \"DATE\")")
+ || e1.getMessage.contains(
+ "cannot resolve '(t.part_col = s.part_col)' due to data type
mismatch: differing types" +
+ " in '(t.part_col = s.part_col)' (boolean and date)."))
+
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using ($sourceTableSubQuery) s
+ |on t.id = s.id
+ |when matched and s.delete_flag = 'Y' then delete
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, value_double, ts, part_col from
$targetTable order by id")(
+ Seq(1L, "record1", 1.1, 1000L, true))
+ }
+
+ // Scenario 4: Failed merge due to primary key type mismatch
+ {
+ val targetTable = createTargetTable("part_col", "long")
+
+ // Insert initial data
+ spark.sql(
+ s"""
+ |insert into $targetTable
+ |select
+ | cast(id as long) as id,
+ | name,
+ | value_double,
+ | ts,
+ | part_col
+ |from (
+ | select 1 as id, 'record1' as name, 1.1 as value_double, 1000
as ts, 100 as part_col
+ |)
+ """.stripMargin)
+
+ val e2 = intercept[Exception] {
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using (
+ | select
+ | cast(1.0 as double) as id,
+ | cast('updated1' as string) as name,
+ | cast(1.11 as double) as value_double,
+ | cast(1001 as long) as ts,
+ | cast(100 as long) as part_col,
+ | cast('Y' as string) as delete_flag
+ |) s
+ |on t.id = s.id
+ |when matched and s.delete_flag = 'Y' then delete
+ """.stripMargin)
+ }
+ assert(e2.getMessage.contains("Invalid MERGE INTO matching
condition: s.id: can't cast s.id (of DoubleType) to LongType"))
+ }
+ }
+ }
+ }
+
+ test("Test Column Type Mismatches for MergeInto Insert and Update Actions") {
+ // Define test cases
+ case class TypeMismatchTestCase(
+ description: String,
+ targetSchema: Seq[(String, String)], //
(colName, colType)
+ sourceSchema: Seq[(String, String)],
+ partitionCols: Seq[String],
+ primaryKey: String,
+ preCombineField: String,
+ tableType: String, // COW or MOR
+ expectedErrorPattern: String
+ )
+
+ val testCases = Seq(
+ TypeMismatchTestCase(
+ description = "Partition column type mismatch",
+ targetSchema = Seq(
+ "id" -> "int",
+ "name" -> "string",
+ "price" -> "int",
+ "ts" -> "long"
+ ),
+ sourceSchema = Seq(
+ "id" -> "int",
+ "name" -> "int", // mismatched type
+ "price" -> "int",
+ "ts" -> "long"
+ ),
+ partitionCols = Seq("name", "price"),
+ primaryKey = "id",
+ preCombineField = "ts",
+ tableType = "cow",
+ expectedErrorPattern = "Partition key data type mismatch between
source table and target table. Target table uses StringType for column 'name',
source table uses IntegerType for 's0.name'"
+ ),
+ TypeMismatchTestCase(
+ description = "Primary key type mismatch",
+ targetSchema = Seq(
+ "id" -> "int",
+ "name" -> "string",
+ "price" -> "int",
+ "ts" -> "long"
+ ),
+ sourceSchema = Seq(
+ "id" -> "long", // mismatched type
+ "name" -> "string",
+ "price" -> "int",
+ "ts" -> "long"
+ ),
+ partitionCols = Seq("name", "price"),
+ primaryKey = "id",
+ preCombineField = "ts",
+ tableType = "mor",
+ expectedErrorPattern = "Primary key data type mismatch between source
table and target table. Target table uses IntegerType for column 'id', source
table uses LongType for 's0.id'"
+ ),
+ TypeMismatchTestCase(
+ description = "Precombine field type mismatch",
+ targetSchema = Seq(
+ "id" -> "int",
+ "name" -> "string",
+ "price" -> "int",
+ "ts" -> "long"
+ ),
+ sourceSchema = Seq(
+ "id" -> "int",
+ "name" -> "string",
+ "price" -> "int",
+ "ts" -> "int" // mismatched type
+ ),
+ partitionCols = Seq("name", "price"),
+ primaryKey = "id",
+ preCombineField = "ts",
+ tableType = "cow",
+ expectedErrorPattern = "Precombine field data type mismatch between
source table and target table. Target table uses LongType for column 'ts',
source table uses IntegerType for 's0.ts'"
+ )
+ )
+
+ def createTable(tableName: String, schema: Seq[(String, String)],
partitionCols: Seq[String],
+ primaryKey: String, preCombineField: String, tableType:
String, location: String): Unit = {
+ val schemaStr = schema.map { case (name, dataType) => s"$name $dataType"
}.mkString(",\n ")
+ val partitionColsStr = if (partitionCols.nonEmpty) s"partitioned by
(${partitionCols.mkString(", ")})" else ""
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | $schemaStr
+ |) using hudi
+ |$partitionColsStr
+ |location '$location'
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = '$primaryKey',
+ | preCombineField = '$preCombineField'
+ |)
+ """.stripMargin)
+ }
+
+ // Run test cases
+ testCases.foreach { testCase =>
+
withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}"
-> "false") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+
+ // Create only target table
+ createTable(
+ targetTable,
+ testCase.targetSchema,
+ testCase.partitionCols,
+ testCase.primaryKey,
+ testCase.preCombineField,
+ testCase.tableType,
+ s"${tmp.getCanonicalPath}/$targetTable"
+ )
+
+ // Insert sample data into target table
+ spark.sql(
+ s"""
+ |insert into $targetTable
+ |select 1 as id, 'John Doe' as name, 19 as price, 1598886000 as
ts
+ |union all
+ |select 2, 'Jane Doe', 24, 1598972400
+ """.stripMargin)
+
+ // Test UPDATE action with inline subquery
+ val updateQuery =
+ s"""
+ |merge into $targetTable t
+ |using (
+ | select
+ | cast(1 as ${testCase.sourceSchema.find(_._1 ==
"id").get._2}) as id,
+ | cast('John Doe' as ${testCase.sourceSchema.find(_._1 ==
"name").get._2}) as name,
+ | cast(20 as ${testCase.sourceSchema.find(_._1 ==
"price").get._2}) as price,
+ | cast(1598886001 as ${testCase.sourceSchema.find(_._1 ==
"ts").get._2}) as ts
+ |) s0
+ |on t.${testCase.primaryKey} = s0.${testCase.primaryKey}
+ |when matched then update set *
+ """.stripMargin
+
+ val updateError = intercept[AnalysisException] {
+ spark.sql(updateQuery)
+ }.getMessage
+
+ assert(updateError.contains(testCase.expectedErrorPattern),
+ s"UPDATE - Expected error pattern
'${testCase.expectedErrorPattern}' not found in actual error: $updateError")
+
+ // Test INSERT action with inline subquery
+ val insertQuery =
+ s"""
+ |merge into $targetTable t
+ |using (
+ | select
+ | cast(3 as ${testCase.sourceSchema.find(_._1 ==
"id").get._2}) as id,
+ | cast('Bob Smith' as ${testCase.sourceSchema.find(_._1 ==
"name").get._2}) as name,
+ | cast(30 as ${testCase.sourceSchema.find(_._1 ==
"price").get._2}) as price,
+ | cast(1598886002 as ${testCase.sourceSchema.find(_._1 ==
"ts").get._2}) as ts
+ |) s0
+ |on t.${testCase.primaryKey} = s0.${testCase.primaryKey}
+ |when not matched then insert *
+ """.stripMargin
+
+ val insertError = intercept[AnalysisException] {
+ spark.sql(insertQuery)
+ }.getMessage
+
+ assert(insertError.contains(testCase.expectedErrorPattern),
+ s"INSERT - Expected error pattern
'${testCase.expectedErrorPattern}' not found in actual error: $insertError")
+ }
+ }
+ }
+ }
+
+ test("Test MergeInto with partition column type mismatch should throw") {
+
withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}"
-> "false") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+
+ // Create target table with string partition
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | id int,
+ | name long,
+ | ts int
+ |) using hudi
+ |partitioned by (name)
+ |location '${tmp.getCanonicalPath}/$targetTable'
+ |tblproperties (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ """.stripMargin)
+
+ // Insert sample data
+ spark.sql(
+ s"""
+ |insert into $targetTable
+ |select 1 as id, 124L as name, 1000 as ts
+ """.stripMargin)
+
+ val e = intercept[AnalysisException] {
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using (
+ | select
+ | cast(1 as int) as id,
+ | cast(123 as int) as name,
+ | cast(1001 as long) as ts
+ |) s
+ |on t.id = s.id
+ |when matched then update set name = s.name
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains("data type mismatch between source table
and target table"))
+ }
+ }
+ }
+
+ test("Test MergeInto with precombine column type mismatch behavior based on
record.merge.mode") {
+
withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}"
-> "false") {
+ withTempDir { tmp =>
+ Seq("EVENT_TIME_ORDERING", "COMMIT_TIME_ORDERING").foreach { mergeMode
=>
+ val targetTable = generateTableName
+
+ // Create target table with int ts
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | id int,
+ | name string,
+ | ts int
+ |) using hudi
+ |partitioned by (name)
+ |location '${tmp.getCanonicalPath}/$targetTable'
+ |tblproperties (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | 'hoodie.record.merge.mode' = '$mergeMode'
+ |)
+ """.stripMargin)
+
+ // Insert sample data
+ spark.sql(
+ s"""
+ |insert into $targetTable
+ |select 1 as id, 'John' as name, 1000 as ts
+ """.stripMargin)
+
+ if (mergeMode == "EVENT_TIME_ORDERING") {
+ // Should throw exception for EVENT_TIME_ORDERING
+ val e = intercept[AnalysisException] {
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using (
+ | select
+ | cast(1 as int) as id,
+ | cast('John' as string) as name,
+ | cast(1001 as long) as ts
+ |) s
+ |on t.id = s.id
+ |when matched then update set ts = s.ts
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains("data type mismatch between source
table and target table"))
+ } else {
+ // Should succeed for COMMIT_TIME_ORDERING
+ spark.sql(
+ s"""
+ |merge into $targetTable t
+ |using (
+ | select
+ | cast(1 as int) as id,
+ | cast('John' as string) as name,
+ | cast(1001 as long) as ts
+ |) s
+ |on t.id = s.id
+ |when matched then update set ts = s.ts
+ """.stripMargin)
+
+ // Verify the update succeeded
+ checkAnswer(s"select id, name, ts from $targetTable where id = 1")(
+ Seq(1, "John", 1001)
+ )
+ }
+ }
+ }
+ }
+ }
+}
+
+object ErrorMessageChecker {
+ private val incompatibleDataPatterns = Set(
+ "Cannot write incompatible data to table",
+ "overflow",
+ "cannot be cast",
+ "Cannot safely cast",
+ "Conversion of",
+ "Failed to parse",
+ "cannot be represented as Decimal"
+ )
+
+ def containsIncompatibleDataError(message: String): Boolean = {
+ incompatibleDataPatterns.exists(message.contains)
+ }
+
+ def isIncompatibleDataException(exception: Exception): Boolean = {
+ containsIncompatibleDataError(exception.getMessage) ||
+ Option(exception.getCause)
+ .exists(cause => containsIncompatibleDataError(cause.getMessage))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
index 6bd084411cd..d32cbbd435e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
@@ -26,7 +26,6 @@ import
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeComm
import org.apache.hudi.table.HoodieSparkTable
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.hadoop.fs.Path
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -123,7 +122,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
s"""
|merge into $newTableName t0
|using (
- | select 1 as id, 'a1' as name, 12 as price, 1001 as ts, 'e0' as
ext0
+ | select 1 as id, 'a1' as name, 12 as price, 1001L as ts, 'e0'
as ext0
|) s0
|on t0.id = s0.id
|when matched then update set *
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 3942af2b145..7c1bc0aa2a2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -214,7 +214,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
s"""
|merge into ${targetTable} as target
|using (
- |select '2' as id, 'bb' as name, 456 as dt, '2024-02-19' as `day`,
10 as `hour`
+ |select '2' as id, 'bb' as name, 456L as dt, '2024-02-19' as `day`,
10 as `hour`
|) as source
|on target.id = source.id
|when matched then update set *
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
index b8ab95e7653..1ecfa4e98ff 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala
@@ -58,7 +58,7 @@ class TestMergeIntoLogOnlyTable extends
HoodieSparkSqlTestBase {
s"""
|merge into $tableName h0
|using (
- | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
+ | select 1 as id, 'a1' as name, 11 as price, 1001L as ts
| ) s0
| on h0.id = s0.id
| when matched then update set *
@@ -74,7 +74,7 @@ class TestMergeIntoLogOnlyTable extends
HoodieSparkSqlTestBase {
s"""
|merge into $tableName h0
|using (
- | select 4 as id, 'a4' as name, 11 as price, 1000 as ts
+ | select 4 as id, 'a4' as name, 11 as price, 1000L as ts
| ) s0
| on h0.id = s0.id
| when not matched then insert *
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index daf07b8e0f0..d2f16599db3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -47,7 +47,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
@@ -63,7 +63,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSuppo
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
- StructField("ts", LongType, nullable = true))
+ StructField("ts", IntegerType, nullable = true))
// First merge with a extra input field 'flag' (insert a new record)
spark.sql(
s"""
@@ -155,7 +155,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| name string,
| data int,
| country string,
- | ts bigint
+ | ts int
|) using hudi
|tblproperties (
| type = 'cow',
@@ -169,7 +169,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
|merge into $targetTable as target
|using (
- |select 1 as id, 'lb' as name, 6 as data, 'shu' as country,
1646643193 as ts
+ |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 43193
as ts
|) source
|on source.id = target.id
|when matched then
@@ -181,7 +181,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
|merge into $targetTable as target
|using (
- |select 1 as id, 'lb' as name, 5 as data, 'shu' as country,
1646643196 as ts
+ |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 43196
as ts
|) source
|on source.id = target.id
|when matched and source.data > target.data then
@@ -193,7 +193,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
|""".stripMargin)
checkAnswer(s"select id, name, data, country, ts from $targetTable")(
- Seq(1, "lb", 5, "shu", 1646643196L)
+ Seq(1, "lb", 5, "shu", 43196)
)
}
}
@@ -285,7 +285,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | ts long
+ | ts int
| ) using parquet
| location '${tmp.getCanonicalPath}/$sourceTable'
""".stripMargin)
@@ -296,7 +296,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}/$targetTable'
| tblproperties (
@@ -447,7 +447,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -462,7 +462,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
- StructField("ts", LongType, nullable = true),
+ StructField("ts", IntegerType, nullable = true),
StructField("dt", StringType, nullable = true))
// Insert data
@@ -470,7 +470,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName as t0
| using (
- | select 1 as id, 'a1' as name, 10 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
@@ -485,7 +485,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName as t0
| using (
- | select 1 as _id, 'a1' as name, 12 as _price, 1001L as _ts,
'2021-03-21' as dt
+ | select 1 as _id, 'a1' as name, 12 as _price, 1001 as _ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0._id
| when matched and s0._id % 2 = 0 then update set
@@ -501,7 +501,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName as t0
| using (
- | select 1 as _id, 'a1' as name, 12 as _price, 1001L as _ts,
'2021-03-21' as dt
+ | select 1 as _id, 'a1' as name, 12 as _price, 1001 as _ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0._id
| when matched and s0._id % 2 = 1 then update set
@@ -517,7 +517,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName as t0
| using (
- | select 2 as id, 'a2' as name, 10 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 2 as id, 'a2' as name, 10 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 0 then insert *
@@ -532,7 +532,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName t0
| using (
- | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001L as s_ts,
'2021-03-21' as dt
+ | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts,
'2021-03-21' as dt
| ) s0
| on t0.id = s0.s_id
| when matched and s_ts = 1001
@@ -552,7 +552,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName t0
| using (
- | select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001L as s_ts,
'2021-03-21' as dt
+ | select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts,
'2021-03-21' as dt
| ) s0
| on t0.id = s0.s_id + 1
| when matched and s_ts = 1001 then delete
@@ -563,7 +563,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName t0
| using (
- | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001L as ts,
'2021-03-21' as dt
+ | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001 as ts,
'2021-03-21' as dt
| ) s0
| on t0.id = s0.s_id
| when matched and s0.ts = 1001 then delete
@@ -584,7 +584,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
spark.sql(
s"""
| create table $tableName (
- | id bigint,
+ | id int,
| name string,
| price double,
| dt string
@@ -644,7 +644,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | v long,
+ | v int,
| dt string
| ) using hudi
| tblproperties (
@@ -718,7 +718,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | v long,
+ | v int,
| dt string
| ) using hudi
| tblproperties (
@@ -754,8 +754,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
//
// 2) set source column name to be different with target column
//
- val errorMessage = "Failed to resolve precombine field `v` w/in the
source-table output"
-
checkException(
s"""
| merge into $tableName1 as t0
@@ -765,7 +763,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| on t0.id = s0.s_id
| when matched then update set id=s0.s_id, name=s0.s_name,
price=s0.s_price*2, v=s0.s_v+2, dt=s0.dt
""".stripMargin
- )(errorMessage)
+ )(
+ "MERGE INTO field resolution error: " +
+ "Failed to resolve precombine field `v` w/in the source-table
output")
spark.sql(
s"""
@@ -795,7 +795,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | v long,
+ | v int,
| dt string
| ) using hudi
| tblproperties (
@@ -815,8 +815,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
// Delete data with a condition expression on primaryKey field
// 1) set source column name to be same as target column
//
- val complexConditionsErrorMessage = "Only simple conditions of the form
`t.id = s.id` are allowed on the primary-key and partition path column. Found
`t0.id = (s0.id + 1)`"
-
checkException(
s"""merge into $tableName1 t0
| using (
@@ -824,7 +822,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| ) s0
| on t0.id = s0.id + 1
| when matched then delete
- """.stripMargin)(complexConditionsErrorMessage)
+ """.stripMargin)(
+ "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key "
+ + "and partition path column. Found `t0.id = (s0.id + 1)`")
spark.sql(
s"""
@@ -844,8 +844,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
//
// 2.a) set source column name to be different with target column
(should fail unable to match precombine field)
//
- val failedToResolveErrorMessage = "Failed to resolve precombine field
`v` w/in the source-table output"
-
checkException(
s"""merge into $tableName1 t0
| using (
@@ -853,7 +851,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| ) s0
| on t0.id = s0.s_id
| when matched then delete
- |""".stripMargin)(failedToResolveErrorMessage)
+ |""".stripMargin)(
+ "MERGE INTO field resolution error: "
+ + "Failed to resolve precombine field `v` w/in the source-table
output")
//
// 2.b) set source column name to be different with target column
@@ -1079,7 +1079,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName
| using (
- | select 1 as id, 'a1' as name, 10 as price, $dataValue as c,
'1' as flag
+ | select 1 as id, 'a1' as name, 10 as price, cast($dataValue as
$dataType) as c, '1' as flag
| ) s0
| on s0.id = $tableName.id
| when matched and flag = '1' then update set *
@@ -1092,7 +1092,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName
| using (
- | select 1 as id, 'a1' as name, 10 as price, $dataValue as c
+ | select 1 as id, 'a1' as name, 10 as price, cast($dataValue as
$dataType) as c
| ) s0
| on s0.id = $tableName.id
| when matched then update set
@@ -1117,7 +1117,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
@@ -1131,7 +1131,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
- StructField("ts", LongType, nullable = true))
+ StructField("ts", IntegerType, nullable = true))
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
@@ -1188,7 +1188,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| value $dataType,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
@@ -1235,7 +1235,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| value $dataType,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
@@ -1269,7 +1269,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| value int,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
@@ -1362,7 +1362,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| value int,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
@@ -1373,45 +1373,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- // Can't down-cast incoming dataset's primary-key w/o loss of precision
(should fail)
- val errorMsg = "Invalid MERGE INTO matching condition: s0.id: can't cast
s0.id (of LongType) to IntegerType"
-
- checkExceptionContain(
- s"""
- |merge into $tableName h0
- |using (
- | select cast(1 as long) as id, 1001 as ts
- | ) s0
- | on cast(h0.id as long) = s0.id
- | when matched then update set h0.ts = s0.ts
- |""".stripMargin)(errorMsg)
-
- // Can't down-cast incoming dataset's primary-key w/o loss of precision
(should fail)
- checkExceptionContain(
- s"""
- |merge into $tableName h0
- |using (
- | select cast(1 as long) as id, 1002 as ts
- | ) s0
- | on h0.id = s0.id
- | when matched then update set h0.ts = s0.ts
- |""".stripMargin)(errorMsg)
-
- // Can up-cast incoming dataset's primary-key w/o loss of precision
(should succeed)
- spark.sql(
- s"""
- |merge into $tableName h0
- |using (
- | select cast(1 as short) as id, 1003 as ts
- | ) s0
- | on h0.id = s0.id
- | when matched then update set h0.ts = s0.ts
- |""".stripMargin)
-
- checkAnswer(s"select id, name, value, ts from $tableName")(
- Seq(1, "a1", 10, 1003)
- )
-
// Can remove redundant symmetrical casting on both sides (should
succeed)
spark.sql(
s"""
@@ -1439,7 +1400,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
spark.sql(
s"""
| create table $tableName (
- | id bigint,
+ | id int,
| name string,
| price double,
| dt string
@@ -1497,7 +1458,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
spark.sql(
s"""
| create table $tableName (
- | id bigint,
+ | id int,
| name string,
| price double,
| ts bigint,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index bd8f7676e02..ea19f9812b7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -41,7 +41,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -57,7 +57,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName as t0
| using (
- | select 1 as id, 'a1' as name, 10 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
@@ -72,7 +72,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName as t0
| using (
- | select 2 as id, 'a2' as name, 10 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 2 as id, 'a2' as name, 10 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
@@ -87,7 +87,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName as t0
| using (
- | select 1 as id, 'a1' as name, 11 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 1 as id, 'a1' as name, 11 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 0 then update set *
@@ -104,7 +104,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName as t0
| using (
- | select 1 as id, 'a1' as name, 11 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 1 as id, 'a1' as name, 11 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 1 then update set id = s0.id, name =
s0.name,
@@ -121,7 +121,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName as t0
| using (
- | select 1 as id, 'a1' as name, 11 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 1 as id, 'a1' as name, 11 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 0 then update set id = s0.id, name =
s0.name,
@@ -138,7 +138,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName as t0
| using (
- | select 1 as id, 'a1' as name, 10 as price, 1000L as ts,
'2021-03-21' as dt
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 1 then update set id = s0.id, name =
s0.name,
@@ -194,7 +194,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| s_value struct<f0: int, f1: string>,
| a_value array<string>,
| m_value map<string, string>,
- | ts long
+ | ts int
| ) using hudi
| tblproperties (
| type = 'mor',
@@ -257,7 +257,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
@@ -345,7 +345,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
@@ -390,7 +390,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
@@ -457,7 +457,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| ID int,
| name string,
| price double,
- | TS long,
+ | ts int,
| DT string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
@@ -529,7 +529,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| ID int,
| NAME string,
| price double,
- | TS long,
+ | ts int,
| dt string
| ) using hudi
| options (
@@ -571,7 +571,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
@@ -618,7 +618,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id2 int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
@@ -664,7 +664,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -702,7 +702,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -742,7 +742,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| create table $tableName (
| id int,
| name string,
- | ts long
+ | ts int
| ) using hudi
| tblproperties (
| type = 'cow',
@@ -783,7 +783,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -848,7 +848,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -913,7 +913,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -962,7 +962,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -992,7 +992,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long,
+ | ts int,
| dt string
| ) using hudi
| tblproperties (
@@ -1080,7 +1080,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| dt = s0.dt
|when matched and s0.id = 2 then update set *
""".stripMargin
- )("No matching assignment found for target table record key
field `id`")
+ )("MERGE INTO field resolution error: No matching assignment
found for target table record key field `id`")
checkException(
s"""
@@ -1095,7 +1095,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| ts = s0.ts,
| dt = s0.dt
""".stripMargin
- )("No matching assignment found for target table record key
field `id`")
+ )("MERGE INTO field resolution error: No matching assignment
found for target table record key field `id`")
}
// Test 2: At least one partial insert assignment clause misses
primary key.
@@ -1110,7 +1110,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
|values (s0.name, s0.price, s0.ts, s0.dt)
|when not matched and s0.id = 2 then insert *
""".stripMargin
- )("No matching assignment found for target table record key field
`id`")
+ )("MERGE INTO field resolution error: No matching assignment found
for target table record key field `id`")
checkException(
s"""
@@ -1122,7 +1122,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
|when not matched then insert (name, price, ts, dt)
|values (s0.name, s0.price, s0.ts, s0.dt)
""".stripMargin
- )("No matching assignment found for target table record key field
`id`")
+ )("MERGE INTO field resolution error: No matching assignment found
for target table record key field `id`")
// Test 3: Partial insert missing preCombineField - only validate
for EVENT_TIME_ORDERING
val mergeStmt =
@@ -1139,7 +1139,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
if (mergeMode == "EVENT_TIME_ORDERING") {
checkException(mergeStmt)(
- "No matching assignment found for target table precombine
field `ts`"
+ "MERGE INTO field resolution error: No matching assignment
found for target table precombine field `ts`"
)
} else {
// For COMMIT_TIME_ORDERING, this should execute without error
@@ -1168,7 +1168,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode
=>
-
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
-> "false") {
+
withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key
-> "true") {
val tableName = generateTableName
spark.sql(
s"""
@@ -1221,28 +1221,6 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase
{
withTempDir { tmp =>
Seq(RecordMergeMode.COMMIT_TIME_ORDERING.name(),
RecordMergeMode.EVENT_TIME_ORDERING.name()).foreach {
recordMergeMode =>
- val sourceTable = generateTableName
- spark.sql(
- s"""
- |CREATE TABLE $sourceTable (
- | id INT,
- | name STRING,
- | price INT,
- | ts BIGINT
- |) USING hudi
- | tblproperties (
- | type = '$tableType'
- | )
- |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
- |""".stripMargin)
-
- spark.sql(
- s"""
- | INSERT INTO $sourceTable
- | VALUES (1, 'John Doe', 19, 1),
- | (4, 'Alice Johnson', 49, 2)
- |""".stripMargin)
-
val targetTable = generateTableName
spark.sql(
s"""
@@ -1277,7 +1255,19 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase
{
spark.sql(
s"""
|MERGE INTO $targetTable t
- |USING $sourceTable s
+ |USING (
+ | SELECT
+ | CAST(1 AS INT) as id,
+ | CAST('John Doe' AS STRING) as name,
+ | CAST(19 AS INT) as price,
+ | CAST(1 AS BIGINT) as ts
+ | UNION ALL
+ | SELECT
+ | CAST(4 AS INT),
+ | CAST('Alice Johnson' AS STRING),
+ | CAST(49 AS INT),
+ | CAST(2 AS BIGINT)
+ |) s
|ON t.price = s.price
|WHEN MATCHED THEN UPDATE SET
| t.id = s.id,
@@ -1303,30 +1293,6 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase
{
test("Test MergeInto with CUSTOM merge mode using FirstValueAvroPayload") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- val sourceTable = generateTableName
- spark.sql(
- s"""
- |CREATE TABLE $sourceTable (
- | id INT,
- | name STRING,
- | price INT,
- | ts BIGINT
- |) USING hudi
- | tblproperties (
- | type = '$tableType'
- | )
- |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
- |""".stripMargin)
-
- // Insert source data with same ts=1598886001 for id=1
- spark.sql(
- s"""
- | INSERT INTO $sourceTable
- | VALUES (1, 'John Doe Updated', 19, 1598886001),
- | (2, 'Jane Doe Updated', 24, 1598972401),
- | (4, 'Alice Johnson', 49, 2)
- |""".stripMargin)
-
val targetTable = generateTableName
spark.sql(
s"""
@@ -1337,7 +1303,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| ts BIGINT
|) using hudi
|TBLPROPERTIES (
- | type = 'cow',
+ | type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| recordMergeMode = '${RecordMergeMode.CUSTOM.name()}',
@@ -1347,44 +1313,60 @@ class TestMergeIntoTable2 extends
HoodieSparkSqlTestBase {
|LOCATION '${tmp.getCanonicalPath}/$targetTable'
|""".stripMargin)
- spark.sql(
- s"""
- |INSERT INTO $targetTable
- |SELECT id, name, price, ts
- |FROM (
- | SELECT 1 as id, 'John Doe Initial' as name, 19 as price,
1598886001 as ts
- | UNION ALL
- | SELECT 2, 'Jane Doe', 24, 1598972400
- | UNION ALL
- | SELECT 3, 'Bob Smith', 14, 1599058800
- |)
- |""".stripMargin)
+ spark.sql(
+ s"""
+ |INSERT INTO $targetTable
+ |SELECT id, name, price, ts
+ |FROM (
+ | SELECT 1 as id, 'John Doe Initial' as name, 19 as price,
1598886001 as ts
+ | UNION ALL
+ | SELECT 2, 'Jane Doe', 24, 1598972400
+ | UNION ALL
+ | SELECT 3, 'Bob Smith', 14, 1599058800
+ |)
+ |""".stripMargin)
- spark.sql(
- s"""
- |MERGE INTO $targetTable t
- |USING $sourceTable s
- |ON t.price = s.price
- |WHEN MATCHED THEN UPDATE SET
- | t.id = s.id,
- | t.name = s.name,
- | t.price = s.price,
- | t.ts = s.ts
- |WHEN NOT MATCHED THEN INSERT
- | (id, name, price, ts)
- |VALUES
- | (s.id, s.name, s.price, s.ts)
- |""".stripMargin)
+ spark.sql(
+ s"""
+ |MERGE INTO $targetTable t
+ |USING (
+ | SELECT
+ | CAST(1 AS INT) as id,
+ | CAST('John Doe Updated' AS STRING) as name,
+ | CAST(19 AS INT) as price,
+ | CAST(1598886001 AS BIGINT) as ts
+ | UNION ALL
+ | SELECT
+ | CAST(2 AS INT),
+ | CAST('Jane Doe Updated' AS STRING),
+ | CAST(24 AS INT),
+ | CAST(1598972401 AS BIGINT)
+ | UNION ALL
+ | SELECT
+ | CAST(4 AS INT),
+ | CAST('Alice Johnson' AS STRING),
+ | CAST(49 AS INT),
+ | CAST(2 AS BIGINT)
+ |) s
+ |ON t.price = s.price
+ |WHEN MATCHED THEN UPDATE SET
+ | t.id = s.id,
+ | t.name = s.name,
+ | t.price = s.price,
+ | t.ts = s.ts
+ |WHEN NOT MATCHED THEN INSERT
+ | (id, name, price, ts)
+ |VALUES
+ | (s.id, s.name, s.price, s.ts)
+ |""".stripMargin)
- // Verify FirstValueAvroPayload behavior:
- // - For id=1: keeps first value ("John Doe Initial") since timestamps
are equal
- // - For id=4: inserts new record normally
- checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY
id")(
- Seq(1, "John Doe Initial", 19, 1598886001L), //
FirstValueAvroPayload keeps first record
- Seq(2, "Jane Doe Updated", 24, 1598972401L),
- Seq(3, "Bob Smith", 14, 1599058800L),
- Seq(4, "Alice Johnson", 49, 2L))
+ checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY
id")(
+ Seq(1, "John Doe Initial", 19, 1598886001L),
+ Seq(2, "Jane Doe Updated", 24, 1598972401L),
+ Seq(3, "Bob Smith", 14, 1599058800L),
+ Seq(4, "Alice Johnson", 49, 2L))
+ }
}
- })
+ )
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
index 7282eddfb25..233e94b0999 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -182,7 +182,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
@@ -257,7 +257,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}'
| $prekstr
@@ -307,7 +307,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
| id int,
| name string,
| price double,
- | ts long
+ | ts int
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
index 5161e622a7c..0b705fd7cac 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
@@ -212,6 +212,9 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
// Delete record
spark.sql(s"delete from $tableName where id = 1")
+ if (tableType == "mor") {
+
HoodieSparkSqlTestBase.validateDeleteLogBlockPrecombineNullOrZero(tmp.getCanonicalPath)
+ }
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
// Verify deletion
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
index 69a5c83d6ae..baf6c976100 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
@@ -226,6 +226,9 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
)
// Delete record with no ts.
spark.sql(s"delete from $tableName where id = 1")
+ if (tableType == "mor") {
+
HoodieSparkSqlTestBase.validateDeleteLogBlockPrecombineNullOrZero(tmp.getCanonicalPath)
+ }
// Verify deletion
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
@@ -325,7 +328,7 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
s"""
| merge into $tableName t
| using (
- | select 7 as id, 'G' as name, 70.0 as price, 99 as ts
union all
+ | select 7 as id, 'G' as name, 70.0 as price, 99L as ts
union all
| select 8, 'H', 80.0, 99 as ts
| ) s
| on t.id = s.id
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index f321d09039d..c12f78f27d3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -35,6 +35,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.avro.Schema
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getMetaClientAndFileSystemView
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructField}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -94,7 +95,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | _ts long,
+ | _ts int,
| description string
|) using hudi
|tblproperties(
@@ -144,7 +145,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | _ts long,
+ | _ts int,
| description string
|) using hudi
|tblproperties(
@@ -205,7 +206,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | _ts long,
+ | _ts int,
| description string
|) using hudi
|tblproperties(
@@ -219,7 +220,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
- StructField("_ts", LongType, nullable = true),
+ StructField("_ts", IntegerType, nullable = true),
StructField("description", StringType, nullable = true))
spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1:
desc1')," +
@@ -274,7 +275,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | _ts long,
+ | _ts int,
| description string
|) using hudi
|tblproperties(
@@ -288,7 +289,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
- StructField("_ts", LongType, nullable = true),
+ StructField("_ts", IntegerType, nullable = true),
StructField("description", StringType, nullable = true))
spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1:
desc1')," +
"(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
@@ -444,7 +445,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | _ts long,
+ | _ts int,
| description string
|) using hudi
|tblproperties(
@@ -528,20 +529,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
expectedNumLogFile: Int,
changedFields: Seq[Seq[String]],
isPartial: Boolean): Unit = {
- val storageConf = HoodieTestUtils.getDefaultStorageConf
- val metaClient: HoodieTableMetaClient =
-
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
- val metadataConfig = HoodieMetadataConfig.newBuilder.build
- val engineContext = new HoodieLocalEngineContext(storageConf)
- val viewManager: FileSystemViewManager =
FileSystemViewManager.createViewManager(
- engineContext, metadataConfig,
FileSystemViewStorageConfig.newBuilder.build,
- HoodieCommonConfig.newBuilder.build,
- (v1: HoodieTableMetaClient) => {
- HoodieTableMetadata.create(
- engineContext, metaClient.getStorage, metadataConfig,
metaClient.getBasePath.toString)
- }
- )
- val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
+ val (metaClient, fsView) = getMetaClientAndFileSystemView(basePath)
val fileSlice: Optional[FileSlice] = fsView.getAllFileSlices("")
.filter((fileSlice: FileSlice) => {
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice).size() ==
expectedNumLogFile