kazuyukitanimura commented on code in PR #471: URL: https://github.com/apache/datafusion-comet/pull/471#discussion_r1624946391
########## spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala: ########## @@ -1469,5 +1469,103 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + test("unary negative integer overflow test") { + def withAnsiMode(enabled: Boolean)(f: => Unit): Unit = { + withSQLConf( + SQLConf.ANSI_ENABLED.key -> enabled.toString, + CometConf.COMET_ANSI_MODE_ENABLED.key -> enabled.toString, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true")(f) + } + + def checkOverflow(query: String, dtype: String): Unit = { + checkSparkMaybeThrows(sql(query)) match { + case (Some(sparkException), Some(cometException)) => + assert(sparkException.getMessage.contains(dtype + " overflow")) + assert(cometException.getMessage.contains(dtype + " overflow")) + case (None, None) => assert(true) // got same outputs + case (None, Some(ex)) => + fail("Comet threw an exception but Spark did not " + ex.getMessage) + case (Some(_), None) => + fail("Spark threw an exception but Comet did not") + } + } + + def runArrayTest(query: String, dtype: String, path: String): Unit = { + withParquetTable(path, "t") { + withAnsiMode(enabled = false) { + checkSparkAnswerAndOperator(sql(query)) + } + withAnsiMode(enabled = true) { + checkOverflow(query, dtype) + } + } + } + withTempDir { dir => + // Array values test + val arrayPath = new Path(dir.toURI.toString, "array_test.parquet").toString + Seq(Int.MaxValue, Int.MinValue).toDF("a").write.mode("overwrite").parquet(arrayPath) + val arrayQuery = "select a, -a from t" + runArrayTest(arrayQuery, "integer", arrayPath) + + // long values test + val longArrayPath = new Path(dir.toURI.toString, "long_array_test.parquet").toString + Seq(Long.MaxValue, Long.MinValue) + .toDF("a") + .write + .mode("overwrite") + .parquet(longArrayPath) + val longArrayQuery = "select a, -a from t" + runArrayTest(longArrayQuery, "long", longArrayPath) + + // short values test + val shortArrayPath = new Path(dir.toURI.toString, "short_array_test.parquet").toString + Seq(Short.MaxValue, Short.MinValue) + .toDF("a") + .write + .mode("overwrite") + .parquet(shortArrayPath) + val shortArrayQuery = "select a, -a from t" + runArrayTest(shortArrayQuery, " caused", shortArrayPath) + + // byte values test + val byteArrayPath = new Path(dir.toURI.toString, "byte_array_test.parquet").toString + Seq(Byte.MaxValue, Byte.MinValue) + .toDF("a") + .write + .mode("overwrite") + .parquet(byteArrayPath) + val byteArrayQuery = "select a, -a from t" + runArrayTest(byteArrayQuery, " caused", byteArrayPath) + + // interval values test + withTable("t_interval") { + spark.sql("CREATE TABLE t_interval(a STRING) USING PARQUET") + spark.sql("INSERT INTO t_interval VALUES ('INTERVAL 10000000000 YEAR')") + withAnsiMode(enabled = true) { + spark + .sql("SELECT CAST(a AS INTERVAL) AS a FROM t_interval") + .createOrReplaceTempView("t_interval_casted") + checkOverflow("SELECT a, -a FROM t_interval_casted", "interval") + } + } Review Comment: It looks this does not hit native code as `CAST(a AS INTERVAL)` is not supported yet. So it will fall back to Spark and `checkOverflow` is comparing Spark results on both sides. Perhaps there is no good way of creating interval now... ########## spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala: ########## @@ -1469,5 +1469,103 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + test("unary negative integer overflow test") { + def withAnsiMode(enabled: Boolean)(f: => Unit): Unit = { + withSQLConf( + SQLConf.ANSI_ENABLED.key -> enabled.toString, + CometConf.COMET_ANSI_MODE_ENABLED.key -> enabled.toString, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true")(f) + } + + def checkOverflow(query: String, dtype: String): Unit = { + checkSparkMaybeThrows(sql(query)) match { + case (Some(sparkException), Some(cometException)) => + assert(sparkException.getMessage.contains(dtype + " overflow")) + assert(cometException.getMessage.contains(dtype + " overflow")) + case (None, None) => assert(true) // got same outputs + case (None, Some(ex)) => + fail("Comet threw an exception but Spark did not " + ex.getMessage) + case (Some(_), None) => + fail("Spark threw an exception but Comet did not") + } + } + + def runArrayTest(query: String, dtype: String, path: String): Unit = { + withParquetTable(path, "t") { + withAnsiMode(enabled = false) { + checkSparkAnswerAndOperator(sql(query)) + } + withAnsiMode(enabled = true) { + checkOverflow(query, dtype) + } + } + } + withTempDir { dir => + // Array values test + val arrayPath = new Path(dir.toURI.toString, "array_test.parquet").toString + Seq(Int.MaxValue, Int.MinValue).toDF("a").write.mode("overwrite").parquet(arrayPath) + val arrayQuery = "select a, -a from t" + runArrayTest(arrayQuery, "integer", arrayPath) + + // long values test + val longArrayPath = new Path(dir.toURI.toString, "long_array_test.parquet").toString + Seq(Long.MaxValue, Long.MinValue) + .toDF("a") + .write + .mode("overwrite") + .parquet(longArrayPath) + val longArrayQuery = "select a, -a from t" + runArrayTest(longArrayQuery, "long", longArrayPath) + + // short values test + val shortArrayPath = new Path(dir.toURI.toString, "short_array_test.parquet").toString + Seq(Short.MaxValue, Short.MinValue) + .toDF("a") + .write + .mode("overwrite") + .parquet(shortArrayPath) + val shortArrayQuery = "select a, -a from t" + runArrayTest(shortArrayQuery, " caused", shortArrayPath) + + // byte values test + val byteArrayPath = new Path(dir.toURI.toString, "byte_array_test.parquet").toString + Seq(Byte.MaxValue, Byte.MinValue) + .toDF("a") + .write + .mode("overwrite") + .parquet(byteArrayPath) + val byteArrayQuery = "select a, -a from t" + runArrayTest(byteArrayQuery, " caused", byteArrayPath) + + // interval values test + withTable("t_interval") { + spark.sql("CREATE TABLE t_interval(a STRING) USING PARQUET") + spark.sql("INSERT INTO t_interval VALUES ('INTERVAL 10000000000 YEAR')") + withAnsiMode(enabled = true) { + spark + .sql("SELECT CAST(a AS INTERVAL) AS a FROM t_interval") + .createOrReplaceTempView("t_interval_casted") + checkOverflow("SELECT a, -a FROM t_interval_casted", "interval") + } + } + + withTable("t") { + sql("create table t(a int) using parquet") + sql("insert into t values (-2147483648)") + withAnsiMode(enabled = true) { + checkOverflow("select a, -a from t", "integer") + } + } + + withTable("t_float") { + sql("create table t_float(a float) using parquet") + sql("insert into t_float values (3.4128235E38)") + withAnsiMode(enabled = true) { + checkOverflow("select a, -a from t_float", "float") + } + } Review Comment: BTW this will not test scalar case unless we do something like ``` withSQLConf( "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { checkOverflow("select a, -(a) from t_float", "float") ``` I think the current test creates a single item array. That said, unless that option is set, it is unlikely to hit the scalar scenario. It is ideal to test scalar cases because user jobs may use that option, but it can be a follow up fix. ########## core/src/execution/datafusion/expressions/negative.rs: ########## @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::CometError; +use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeType}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Schema}; +use datafusion::{ + logical_expr::{interval_arithmetic::Interval, ColumnarValue}, + physical_expr::PhysicalExpr, +}; +use datafusion_common::{Result, ScalarValue}; +use datafusion_physical_expr::{ + aggregate::utils::down_cast_any_ref, sort_properties::SortProperties, +}; +use std::{ + any::Any, + hash::{Hash, Hasher}, + sync::Arc, +}; + +pub fn create_negate_expr( + expr: Arc<dyn PhysicalExpr>, + fail_on_error: bool, +) -> Result<Arc<dyn PhysicalExpr>, CometError> { + Ok(Arc::new(NegativeExpr::new(expr, fail_on_error))) +} + +/// Negative expression +#[derive(Debug, Hash)] +pub struct NegativeExpr { + /// Input expression + arg: Arc<dyn PhysicalExpr>, + fail_on_error: bool, +} + +fn arithmetic_overflow_error(from_type: &str) -> CometError { + CometError::ArithmeticOverflow { + from_type: from_type.to_string(), + } +} + +macro_rules! check_overflow { + ($array:expr, $array_type:ty, $min_val:expr, $type_name:expr) => {{ + let typed_array = $array + .as_any() + .downcast_ref::<$array_type>() + .expect(concat!(stringify!($array_type), " expected")); + for i in 0..typed_array.len() { + if typed_array.value(i) == $min_val { + if $type_name == "byte" || $type_name == "short" { + let value = typed_array.value(i).to_string() + " caused"; + return Err(arithmetic_overflow_error(value.as_str()).into()); + } + return Err(arithmetic_overflow_error($type_name).into()); + } + } + }}; +} + +impl NegativeExpr { + /// Create new not expression + pub fn new(arg: Arc<dyn PhysicalExpr>, fail_on_error: bool) -> Self { + Self { arg, fail_on_error } + } + + /// Get the input expression + pub fn arg(&self) -> &Arc<dyn PhysicalExpr> { + &self.arg + } +} + +impl std::fmt::Display for NegativeExpr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "(- {})", self.arg) + } +} + +impl PhysicalExpr for NegativeExpr { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> Result<DataType> { + self.arg.data_type(input_schema) + } + + fn nullable(&self, input_schema: &Schema) -> Result<bool> { + self.arg.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { + let arg = self.arg.evaluate(batch)?; + + // overflow checks only apply in ANSI mode + // datatypes supported are byte, short, integer, long, float, interval + match arg { + ColumnarValue::Array(array) => { + if self.fail_on_error { + match array.data_type() { + DataType::Int8 => { + check_overflow!(array, arrow::array::Int8Array, i8::MIN, "byte") + } + DataType::Int16 => { + check_overflow!(array, arrow::array::Int16Array, i16::MIN, "short") + } + DataType::Int32 => { + check_overflow!(array, arrow::array::Int32Array, i32::MIN, "integer") + } + DataType::Int64 => { + check_overflow!(array, arrow::array::Int64Array, i64::MIN, "long") + } + DataType::Interval(value) => match value { + arrow::datatypes::IntervalUnit::YearMonth => check_overflow!( + array, + arrow::array::IntervalYearMonthArray, + i32::MIN, + "interval" + ), + arrow::datatypes::IntervalUnit::DayTime => check_overflow!( + array, + arrow::array::IntervalDayTimeArray, + i64::MIN, + "interval" + ), Review Comment: I was expecting to see that testing this fails because DataFusion `neg_wrapping` breaks the `i64` into two `i32`. Then I realized there is no good way of testing right now as mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org