Davis-Zhang-Onehouse commented on code in PR #12798:
URL: https://github.com/apache/hudi/pull/12798#discussion_r1966164729


##########
hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql:
##########
@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
 # CREATE TABLE
 
 create table h1 (
-  id bigint,
+  id int,
   name string,
   price double,
-  ts bigint
+  ts int

Review Comment:
   as we discussed, I will extend an existing test with long data type as 
precombine field and cover delete query + validate the log file precombine 
filed is 0 or null



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -73,7 +73,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSuppo
              | ) s0
              | on s0.id = $tableName.id
              | when matched and flag = '1' then update set
-             | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+             | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts + 1

Review Comment:
   reverted



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala:
##########
@@ -242,7 +242,7 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
                |  id int,
                |  name string,
                |  price double,
-               |  ts long
+               |  ts int

Review Comment:
   discussed, will have coverage on what you want and keep this change as is



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTableColumnTypeMismatch.scala:
##########


Review Comment:
   oh, thanks for catching this. Moved to the "common" package.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTableColumnTypeMismatch.scala:
##########
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.{DataSourceWriteOptions, ScalaAssertionSupport}
+
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.hudi.ErrorMessageChecker.isIncompatibleDataException
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestTableColumnTypeMismatch extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {

Review Comment:
   combined as many test dimensions as possible into a single 1, bringing down 
the test runtime from 3 min 40 sec to 2 min 10 sec.
   
   If this is not enough, please point out which tests (dimension) we should 
throw away



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTableColumnTypeMismatch.scala:
##########
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.{DataSourceWriteOptions, ScalaAssertionSupport}
+
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.hudi.ErrorMessageChecker.isIncompatibleDataException
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestTableColumnTypeMismatch extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
+
+  test("Test Spark implicit type casting behaviors") {
+    // Capturing the current behavior of Spark's implicit type casting.
+    withRecordType()(withTempDir { tmp =>
+      // Define test cases for implicit casting
+      case class TypeCastTestCase(
+                                   sourceType: String,
+                                   targetType: String,
+                                   testValue: String, // SQL literal expression
+                                   expectedValue: Any,
+                                   shouldSucceed: Boolean,
+                                   description: String = ""
+                                 )
+
+      val testCases = Seq(
+        // Numeric widening conversions (always safe)
+        TypeCastTestCase("tinyint", "smallint", "127", 127, true, "tinyint to 
smallint widening"),
+        TypeCastTestCase("tinyint", "int", "127", 127, true, "tinyint to int 
widening"),
+        TypeCastTestCase("tinyint", "bigint", "127", 127L, true, "tinyint to 
bigint widening"),
+        TypeCastTestCase("tinyint", "float", "127", 127.0f, true, "tinyint to 
float widening"),
+        TypeCastTestCase("tinyint", "double", "127", 127.0d, true, "tinyint to 
double widening"),
+        TypeCastTestCase("tinyint", "decimal(10,1)", "127", 
java.math.BigDecimal.valueOf(127.0), true, "tinyint to decimal widening"),
+
+        TypeCastTestCase("smallint", "int", "32767", 32767, true, "smallint to 
int widening"),
+        TypeCastTestCase("smallint", "bigint", "32767", 32767L, true, 
"smallint to bigint widening"),
+        TypeCastTestCase("smallint", "float", "32767", 32767.0f, true, 
"smallint to float widening"),

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala:
##########
@@ -49,7 +49,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts int

Review Comment:
   as discussed, need to extend 1 test to cover explicitly



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -924,26 +972,82 @@ object MergeIntoHoodieTableCommand {
                                                  fields: Seq[String],
                                                  fieldType: String,
                                                  assignments: 
Seq[Assignment]): Unit = {
-    // To find corresponding [[fieldType]] attribute w/in the [[assignments]] 
we do
-    //    - Check if target table itself has the attribute
-    //    - Check if in any of the assignment actions, whose right-hand side 
attribute
-    // resolves to the source attribute. For example,
-    //        WHEN MATCHED THEN UPDATE SET targetTable.attribute = <expr>
-    // the left-hand side of the assignment can be resolved to the target 
fields we are
-    // validating here.
     fields.foreach { field =>
       targetTable.output
         .find(attr => resolver(attr.name, field))
-        .getOrElse(throw new AnalysisException(s"Failed to resolve $fieldType 
`$field` in target table"))
+        .getOrElse(throw new MergeIntoFieldResolutionException(s"Failed to 
resolve $fieldType `$field` in target table"))
 
       if (!assignments.exists {
         case Assignment(attr: AttributeReference, _) if resolver(attr.name, 
field) => true
         case _ => false
       }) {
-        throw new AnalysisException(s"No matching assignment found for target 
table $fieldType `$field`")
+        throw new MergeIntoFieldResolutionException(s"No matching assignment 
found for target table $fieldType `$field`")
       }
     }
   }
+
+  /**
+   * Generic method to resolve field associations between target and source 
tables
+   *
+   * @param resolver The resolver to use
+   * @param targetTable The target table of the merge
+   * @param sourceTable The source table of the merge
+   * @param fields The fields from the target table whose association with the 
source to be resolved
+   * @param fieldType String describing the type of field (for error messages)
+   * @param assignments The assignments clause of the merge into used for 
resolving the association
+   * @return Sequence of resolved (target table attribute, source table 
expression)
+   * mapping for target [[fields]].
+   *
+   * @throws AnalysisException if a field cannot be resolved
+   */
+  def resolveFieldAssociationsBetweenSourceAndTarget(resolver: Resolver,
+                                                     targetTable: LogicalPlan,
+                                                     sourceTable: LogicalPlan,
+                                                     fields: Seq[String],
+                                                     fieldType: String,
+                                                     assignments: 
Seq[Assignment]
+                             ): Seq[(Attribute, Expression)] = {
+    fields.map { field =>
+      val targetAttribute = targetTable.output
+        .find(attr => resolver(attr.name, field))
+        .getOrElse(throw new MergeIntoFieldResolutionException(
+          s"Failed to resolve $fieldType `$field` in target table"))
+
+      val sourceExpr = sourceTable.output
+        .find(attr => resolver(attr.name, field))
+        .getOrElse {
+          assignments.collectFirst {
+            case Assignment(attr: AttributeReference, expr)
+              if resolver(attr.name, field) && 
resolvesToSourceAttribute(sourceTable, expr) => expr
+          }.getOrElse {
+            throw new MergeIntoFieldResolutionException(
+              s"Failed to resolve $fieldType `$field` w/in the source-table 
output")
+          }
+        }
+
+      (targetAttribute, sourceExpr)
+    }
+  }
+
+  def resolvesToSourceAttribute(sourceTable: LogicalPlan, expr: Expression): 
Boolean = {
+    val sourceTableOutputSet = sourceTable.outputSet
+    expr match {
+      case attr: AttributeReference => sourceTableOutputSet.contains(attr)
+      case MatchCast(attr: AttributeReference, _, _, _) => 
sourceTableOutputSet.contains(attr)
+
+      case _ => false
+    }
+  }
+
+  def validateDataTypes(attr: Attribute, expr: Expression, columnType: 
String): Unit = {
+    if (attr.dataType != expr.dataType) {
+      throw new AnalysisException(

Review Comment:
   MergeIntoFieldResolutionException is specially for case where we could not 
find some column. If some exceptions other than AnalysisException is required, 
I can create a new type of exception



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1373,45 +1373,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
 
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
 
-      // Can't down-cast incoming dataset's primary-key w/o loss of precision 
(should fail)
-      val errorMsg = "Invalid MERGE INTO matching condition: s0.id: can't cast 
s0.id (of LongType) to IntegerType"
-
-      checkExceptionContain(
-        s"""
-           |merge into $tableName h0
-           |using (
-           |  select cast(1 as long) as id, 1001 as ts
-           | ) s0
-           | on cast(h0.id as long) = s0.id
-           | when matched then update set h0.ts = s0.ts
-           |""".stripMargin)(errorMsg)
-
-      // Can't down-cast incoming dataset's primary-key w/o loss of precision 
(should fail)
-      checkExceptionContain(
-        s"""
-           |merge into $tableName h0
-           |using (
-           |  select cast(1 as long) as id, 1002 as ts
-           | ) s0
-           | on h0.id = s0.id
-           | when matched then update set h0.ts = s0.ts
-           |""".stripMargin)(errorMsg)
-
-      // Can up-cast incoming dataset's primary-key w/o loss of precision 
(should succeed)
-      spark.sql(
-        s"""
-           |merge into $tableName h0
-           |using (
-           |  select cast(1 as short) as id, 1003 as ts
-           | ) s0
-           | on h0.id = s0.id
-           | when matched then update set h0.ts = s0.ts
-           |""".stripMargin)
-
-      checkAnswer(s"select id, name, value, ts from $tableName")(
-        Seq(1, "a1", 10, 1003)
-      )

Review Comment:
   yeah, I added a whole test suite cover type mismatch as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to