This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 8f8353bd9 fix : cast_operands_to_decimal_type_to_fix_arithmetic_overflow (#1996) 8f8353bd9 is described below commit 8f8353bd924d144d4ef494e39587c910a2763269 Author: B Vadlamani <v_vadlam...@apple.com> AuthorDate: Thu Jul 31 11:27:08 2025 -0700 fix : cast_operands_to_decimal_type_to_fix_arithmetic_overflow (#1996) --- .../scala/org/apache/comet/serde/arithmetic.scala | 20 +++++++++++++++----- .../org/apache/comet/CometExpressionSuite.scala | 16 +++++++++++++++- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 3a7a9f8fb..605991a87 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.math.min -import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Divide, EqualTo, EvalMode, Expression, If, IntegralDivide, Literal, Multiply, Remainder, Subtract} +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Cast, Divide, EqualTo, EvalMode, Expression, If, IntegralDivide, Literal, Multiply, Remainder, Subtract} import org.apache.spark.sql.types.{ByteType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -201,7 +201,6 @@ object CometIntegralDivide extends CometExpressionSerde with MathBase { inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { val div = expr.asInstanceOf[IntegralDivide] - val rightExpr = nullIfWhenPrimitive(div.right) if (!supportedDataType(div.left.dataType)) { withInfo(div, s"Unsupported datatype ${div.left.dataType}") @@ -212,17 +211,28 @@ object CometIntegralDivide extends CometExpressionSerde with MathBase { return None } - val dataType = (div.left.dataType, div.right.dataType) match { +// Precision is set to 19 (max precision for a numerical data type except DecimalType) + + val left = + if (div.left.dataType.isInstanceOf[DecimalType]) div.left + else Cast(div.left, DecimalType(19, 0)) + val right = + if (div.right.dataType.isInstanceOf[DecimalType]) div.right + else Cast(div.right, DecimalType(19, 0)) + + val rightExpr = nullIfWhenPrimitive(right) + + val dataType = (left.dataType, right.dataType) match { case (l: DecimalType, r: DecimalType) => // copy from IntegralDivide.resultDecimalType val intDig = l.precision - l.scale + r.scale DecimalType(min(if (intDig == 0) 1 else intDig, DecimalType.MAX_PRECISION), 0) - case _ => div.left.dataType + case _ => left.dataType } val divideExpr = createMathExpression( expr, - div.left, + left, rightExpr, inputs, binding, diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 1e6f52980..4f4554d9e 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -113,6 +113,18 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("Integral Division Overflow Handling Matches Spark Behavior") { + withTable("t1") { + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + val value = Long.MinValue + sql("create table t1(c1 long, c2 short) using parquet") + sql(s"insert into t1 values($value, -1)") + val res = sql("select c1 div c2 from t1 order by c1") + checkSparkAnswerAndOperator(res) + } + } + } + test("basic data type support") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => @@ -2686,7 +2698,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test integral divide") { // this test requires native_comet scan due to unsigned u8/u16 issue - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path1 = new Path(dir.toURI.toString, "test1.parquet") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org