This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2741ae76 feat: Only allow incompatible cast expressions to run in 
comet if a config is enabled (#362)
2741ae76 is described below

commit 2741ae76bf5a7e2132800523c772f105868f472c
Author: Andy Grove <[email protected]>
AuthorDate: Fri May 3 13:24:04 2024 -0600

    feat: Only allow incompatible cast expressions to run in comet if a config 
is enabled (#362)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  49 +-----
 ...{compatibility.md => compatibility-template.md} |  18 +-
 docs/source/user-guide/compatibility.md            | 159 ++++++++++++++---
 docs/source/user-guide/configs.md                  |   2 +-
 spark/pom.xml                                      |  13 +-
 .../main/scala/org/apache/comet/GenerateDocs.scala |  94 ++++++++++
 .../org/apache/comet/expressions/CometCast.scala   | 196 +++++++++++++++++++++
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  54 +++---
 .../scala/org/apache/comet/CometCastSuite.scala    | 111 ++++++++----
 .../org/apache/comet/CometExpressionSuite.scala    | 108 +++++++-----
 .../apache/comet/exec/CometAggregateSuite.scala    |   3 +-
 .../org/apache/comet/exec/CometExecSuite.scala     |   4 +-
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |   1 +
 13 files changed, 624 insertions(+), 188 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index ca4bf470..26114090 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -19,11 +19,9 @@
 
 package org.apache.comet
 
-import java.io.{BufferedOutputStream, FileOutputStream}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.ListBuffer
-import scala.io.Source
 
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.network.util.JavaUtils
@@ -376,12 +374,14 @@ object CometConf {
     .booleanConf
     .createWithDefault(false)
 
-  val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf(
-    "spark.comet.cast.stringToTimestamp")
-    .doc(
-      "Comet is not currently fully compatible with Spark when casting from 
String to Timestamp.")
-    .booleanConf
-    .createWithDefault(false)
+  val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
+    conf("spark.comet.cast.allowIncompatible")
+      .doc(
+        "Comet is not currently fully compatible with Spark for all cast 
operations. " +
+          "Set this config to true to allow them anyway. See compatibility 
guide " +
+          "for more information.")
+      .booleanConf
+      .createWithDefault(false)
 
 }
 
@@ -625,36 +625,3 @@ private[comet] case class ConfigBuilder(key: String) {
 private object ConfigEntry {
   val UNDEFINED = "<undefined>"
 }
-
-/**
- * Utility for generating markdown documentation from the configs.
- *
- * This is invoked when running `mvn clean package -DskipTests`.
- */
-object CometConfGenerateDocs {
-  def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
-      // scalastyle:off println
-      println("Missing arguments for template file and output file")
-      // scalastyle:on println
-      sys.exit(-1)
-    }
-    val templateFilename = args.head
-    val outputFilename = args(1)
-    val w = new BufferedOutputStream(new FileOutputStream(outputFilename))
-    for (line <- Source.fromFile(templateFilename).getLines()) {
-      if (line.trim == "<!--CONFIG_TABLE-->") {
-        val publicConfigs = CometConf.allConfs.filter(_.isPublic)
-        val confs = publicConfigs.sortBy(_.key)
-        w.write("| Config | Description | Default Value |\n".getBytes)
-        w.write("|--------|-------------|---------------|\n".getBytes)
-        for (conf <- confs) {
-          w.write(s"| ${conf.key} | ${conf.doc.trim} | 
${conf.defaultValueString} |\n".getBytes)
-        }
-      } else {
-        w.write(s"${line.trim}\n".getBytes)
-      }
-    }
-    w.close()
-  }
-}
diff --git a/docs/source/user-guide/compatibility.md 
b/docs/source/user-guide/compatibility-template.md
similarity index 64%
copy from docs/source/user-guide/compatibility.md
copy to docs/source/user-guide/compatibility-template.md
index b4b4c92e..deaca2d2 100644
--- a/docs/source/user-guide/compatibility.md
+++ b/docs/source/user-guide/compatibility-template.md
@@ -34,13 +34,17 @@ There is an 
[epic](https://github.com/apache/datafusion-comet/issues/313) where
 
 ## Cast
 
-Comet currently delegates to Apache DataFusion for most cast operations, and 
this means that the behavior is not
-guaranteed to be consistent with Spark.
+Cast operations in Comet fall into three levels of support:
 
-There is an [epic](https://github.com/apache/datafusion-comet/issues/286) 
where we are tracking the work to implement Spark-compatible cast expressions.
+- **Compatible**: The results match Apache Spark
+- **Incompatible**: The results may match Apache Spark for some inputs, but 
there are known issues where some inputs
+  will result in incorrect results or exceptions. The query stage will fall 
back to Spark by default. Setting
+  `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts 
to run natively in Comet, but this is not
+  recommended for production use.
+- **Unsupported**: Comet does not provide a native version of this cast 
expression and the query stage will fall back to
+  Spark.
 
-### Cast from String to Timestamp
+The following table shows the current cast operations supported by Comet. Any 
cast that does not appear in this
+table (such as those involving complex types and timestamp_ntz, for example) 
are not supported by Comet.
 
-Casting from String to Timestamp is disabled by default due to 
incompatibilities with Spark, including timezone
-issues, and can be enabled by setting 
`spark.comet.castStringToTimestamp=true`. See the
-[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for 
more information.
+<!--CAST_TABLE-->
diff --git a/docs/source/user-guide/compatibility.md 
b/docs/source/user-guide/compatibility.md
index b4b4c92e..9a2478d3 100644
--- a/docs/source/user-guide/compatibility.md
+++ b/docs/source/user-guide/compatibility.md
@@ -1,20 +1,20 @@
 <!---
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
 -->
 
 # Compatibility Guide
@@ -34,13 +34,126 @@ There is an 
[epic](https://github.com/apache/datafusion-comet/issues/313) where
 
 ## Cast
 
-Comet currently delegates to Apache DataFusion for most cast operations, and 
this means that the behavior is not
-guaranteed to be consistent with Spark.
+Cast operations in Comet fall into three levels of support:
 
-There is an [epic](https://github.com/apache/datafusion-comet/issues/286) 
where we are tracking the work to implement Spark-compatible cast expressions.
+- **Compatible**: The results match Apache Spark
+- **Incompatible**: The results may match Apache Spark for some inputs, but 
there are known issues where some inputs
+  will result in incorrect results or exceptions. The query stage will fall 
back to Spark by default. Setting
+  `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts 
to run natively in Comet, but this is not
+  recommended for production use.
+- **Unsupported**: Comet does not provide a native version of this cast 
expression and the query stage will fall back to
+  Spark.
 
-### Cast from String to Timestamp
+The following table shows the current cast operations supported by Comet. Any 
cast that does not appear in this
+table (such as those involving complex types and timestamp_ntz, for example) 
are not supported by Comet.
 
-Casting from String to Timestamp is disabled by default due to 
incompatibilities with Spark, including timezone
-issues, and can be enabled by setting 
`spark.comet.castStringToTimestamp=true`. See the
-[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for 
more information.
+| From Type | To Type   | Compatible?  | Notes                               |
+| --------- | --------- | ------------ | ----------------------------------- |
+| boolean   | byte      | Compatible   |                                     |
+| boolean   | short     | Compatible   |                                     |
+| boolean   | integer   | Compatible   |                                     |
+| boolean   | long      | Compatible   |                                     |
+| boolean   | float     | Compatible   |                                     |
+| boolean   | double    | Compatible   |                                     |
+| boolean   | decimal   | Unsupported  |                                     |
+| boolean   | string    | Compatible   |                                     |
+| boolean   | timestamp | Unsupported  |                                     |
+| byte      | boolean   | Compatible   |                                     |
+| byte      | short     | Compatible   |                                     |
+| byte      | integer   | Compatible   |                                     |
+| byte      | long      | Compatible   |                                     |
+| byte      | float     | Compatible   |                                     |
+| byte      | double    | Compatible   |                                     |
+| byte      | decimal   | Compatible   |                                     |
+| byte      | string    | Compatible   |                                     |
+| byte      | binary    | Unsupported  |                                     |
+| byte      | timestamp | Unsupported  |                                     |
+| short     | boolean   | Compatible   |                                     |
+| short     | byte      | Compatible   |                                     |
+| short     | integer   | Compatible   |                                     |
+| short     | long      | Compatible   |                                     |
+| short     | float     | Compatible   |                                     |
+| short     | double    | Compatible   |                                     |
+| short     | decimal   | Compatible   |                                     |
+| short     | string    | Compatible   |                                     |
+| short     | binary    | Unsupported  |                                     |
+| short     | timestamp | Unsupported  |                                     |
+| integer   | boolean   | Compatible   |                                     |
+| integer   | byte      | Compatible   |                                     |
+| integer   | short     | Compatible   |                                     |
+| integer   | long      | Compatible   |                                     |
+| integer   | float     | Compatible   |                                     |
+| integer   | double    | Compatible   |                                     |
+| integer   | decimal   | Compatible   |                                     |
+| integer   | string    | Compatible   |                                     |
+| integer   | binary    | Unsupported  |                                     |
+| integer   | timestamp | Unsupported  |                                     |
+| long      | boolean   | Compatible   |                                     |
+| long      | byte      | Compatible   |                                     |
+| long      | short     | Compatible   |                                     |
+| long      | integer   | Compatible   |                                     |
+| long      | float     | Compatible   |                                     |
+| long      | double    | Compatible   |                                     |
+| long      | decimal   | Compatible   |                                     |
+| long      | string    | Compatible   |                                     |
+| long      | binary    | Unsupported  |                                     |
+| long      | timestamp | Unsupported  |                                     |
+| float     | boolean   | Compatible   |                                     |
+| float     | byte      | Unsupported  |                                     |
+| float     | short     | Unsupported  |                                     |
+| float     | integer   | Unsupported  |                                     |
+| float     | long      | Unsupported  |                                     |
+| float     | double    | Compatible   |                                     |
+| float     | decimal   | Unsupported  |                                     |
+| float     | string    | Incompatible |                                     |
+| float     | timestamp | Unsupported  |                                     |
+| double    | boolean   | Compatible   |                                     |
+| double    | byte      | Unsupported  |                                     |
+| double    | short     | Unsupported  |                                     |
+| double    | integer   | Unsupported  |                                     |
+| double    | long      | Unsupported  |                                     |
+| double    | float     | Compatible   |                                     |
+| double    | decimal   | Incompatible |                                     |
+| double    | string    | Incompatible |                                     |
+| double    | timestamp | Unsupported  |                                     |
+| decimal   | boolean   | Unsupported  |                                     |
+| decimal   | byte      | Unsupported  |                                     |
+| decimal   | short     | Unsupported  |                                     |
+| decimal   | integer   | Unsupported  |                                     |
+| decimal   | long      | Unsupported  |                                     |
+| decimal   | float     | Compatible   |                                     |
+| decimal   | double    | Compatible   |                                     |
+| decimal   | string    | Unsupported  |                                     |
+| decimal   | timestamp | Unsupported  |                                     |
+| string    | boolean   | Compatible   |                                     |
+| string    | byte      | Compatible   |                                     |
+| string    | short     | Compatible   |                                     |
+| string    | integer   | Compatible   |                                     |
+| string    | long      | Compatible   |                                     |
+| string    | float     | Unsupported  |                                     |
+| string    | double    | Unsupported  |                                     |
+| string    | decimal   | Unsupported  |                                     |
+| string    | binary    | Compatible   |                                     |
+| string    | date      | Unsupported  |                                     |
+| string    | timestamp | Incompatible | Not all valid formats are supported |
+| binary    | string    | Incompatible |                                     |
+| date      | boolean   | Unsupported  |                                     |
+| date      | byte      | Unsupported  |                                     |
+| date      | short     | Unsupported  |                                     |
+| date      | integer   | Unsupported  |                                     |
+| date      | long      | Unsupported  |                                     |
+| date      | float     | Unsupported  |                                     |
+| date      | double    | Unsupported  |                                     |
+| date      | decimal   | Unsupported  |                                     |
+| date      | string    | Compatible   |                                     |
+| date      | timestamp | Unsupported  |                                     |
+| timestamp | boolean   | Unsupported  |                                     |
+| timestamp | byte      | Unsupported  |                                     |
+| timestamp | short     | Unsupported  |                                     |
+| timestamp | integer   | Unsupported  |                                     |
+| timestamp | long      | Compatible   |                                     |
+| timestamp | float     | Unsupported  |                                     |
+| timestamp | double    | Unsupported  |                                     |
+| timestamp | decimal   | Unsupported  |                                     |
+| timestamp | string    | Compatible   |                                     |
+| timestamp | date      | Compatible   |                                     |
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 3a16cd47..02ecbd69 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -25,7 +25,7 @@ Comet provides the following configuration settings.
 |--------|-------------|---------------|
 | spark.comet.ansi.enabled | Comet does not respect ANSI mode in most cases 
and by default will not accelerate queries when ansi mode is enabled. Enable 
this setting to test Comet's experimental support for ANSI mode. This should 
not be used in production. | false |
 | spark.comet.batchSize | The columnar batch size, i.e., the maximum number of 
rows that a batch can contain. | 8192 |
-| spark.comet.cast.stringToTimestamp | Comet is not currently fully compatible 
with Spark when casting from String to Timestamp. | false |
+| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible 
with Spark for all cast operations. Set this config to true to allow them 
anyway. See compatibility guide for more information. | false |
 | spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous 
shuffle for Arrow-based shuffle. By default, this config is false. | false |
 | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of 
threads on an executor used for Comet async columnar shuffle. By default, this 
config is 100. This is the upper bound of total number of shuffle threads per 
executor. In other words, if the number of cores * the number of shuffle 
threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than 
this config. Comet will use this config as the number of shuffle threads per 
executor instead. | 100 |
 | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for 
Comet async columnar shuffle per shuffle task. By default, this config is 3. 
Note that more threads means more memory requirement to buffer shuffle data 
before flushing to disk. Also, more threads may not always improve performance, 
and should be set based on the number of cores available. | 3 |
diff --git a/spark/pom.xml b/spark/pom.xml
index 9392b7fe..7d3d3d75 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -58,6 +58,11 @@ under the License.
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
@@ -270,17 +275,13 @@ under the License.
         <version>3.2.0</version>
         <executions>
           <execution>
-            <id>generate-config-docs</id>
+            <id>generate-user-guide-reference-docs</id>
             <phase>package</phase>
             <goals>
               <goal>java</goal>
             </goals>
             <configuration>
-              <mainClass>org.apache.comet.CometConfGenerateDocs</mainClass>
-              <arguments>
-                <argument>docs/source/user-guide/configs-template.md</argument>
-                <argument>docs/source/user-guide/configs.md</argument>
-              </arguments>
+              <mainClass>org.apache.comet.GenerateDocs</mainClass>
               <classpathScope>compile</classpathScope>
             </configuration>
           </execution>
diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala 
b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
new file mode 100644
index 00000000..8c414c7f
--- /dev/null
+++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import java.io.{BufferedOutputStream, FileOutputStream}
+
+import scala.io.Source
+
+import org.apache.spark.sql.catalyst.expressions.Cast
+
+import org.apache.comet.expressions.{CometCast, Compatible, Incompatible, 
Unsupported}
+
+/**
+ * Utility for generating markdown documentation from the configs.
+ *
+ * This is invoked when running `mvn clean package -DskipTests`.
+ */
+object GenerateDocs {
+
+  def main(args: Array[String]): Unit = {
+    generateConfigReference()
+    generateCompatibilityGuide()
+  }
+
+  private def generateConfigReference(): Unit = {
+    val templateFilename = "docs/source/user-guide/configs-template.md"
+    val outputFilename = "docs/source/user-guide/configs.md"
+    val w = new BufferedOutputStream(new FileOutputStream(outputFilename))
+    for (line <- Source.fromFile(templateFilename).getLines()) {
+      if (line.trim == "<!--CONFIG_TABLE-->") {
+        val publicConfigs = CometConf.allConfs.filter(_.isPublic)
+        val confs = publicConfigs.sortBy(_.key)
+        w.write("| Config | Description | Default Value |\n".getBytes)
+        w.write("|--------|-------------|---------------|\n".getBytes)
+        for (conf <- confs) {
+          w.write(s"| ${conf.key} | ${conf.doc.trim} | 
${conf.defaultValueString} |\n".getBytes)
+        }
+      } else {
+        w.write(s"${line.trim}\n".getBytes)
+      }
+    }
+    w.close()
+  }
+
+  private def generateCompatibilityGuide(): Unit = {
+    val templateFilename = "docs/source/user-guide/compatibility-template.md"
+    val outputFilename = "docs/source/user-guide/compatibility.md"
+    val w = new BufferedOutputStream(new FileOutputStream(outputFilename))
+    for (line <- Source.fromFile(templateFilename).getLines()) {
+      if (line.trim == "<!--CAST_TABLE-->") {
+        w.write("| From Type | To Type | Compatible? | Notes |\n".getBytes)
+        w.write("|-|-|-|-|\n".getBytes)
+        for (fromType <- CometCast.supportedTypes) {
+          for (toType <- CometCast.supportedTypes) {
+            if (Cast.canCast(fromType, toType) && fromType != toType) {
+              val fromTypeName = fromType.typeName.replace("(10,2)", "")
+              val toTypeName = toType.typeName.replace("(10,2)", "")
+              CometCast.isSupported(fromType, toType, None, "LEGACY") match {
+                case Compatible =>
+                  w.write(s"| $fromTypeName | $toTypeName | Compatible | 
|\n".getBytes)
+                case Incompatible(Some(reason)) =>
+                  w.write(s"| $fromTypeName | $toTypeName | Incompatible | 
$reason |\n".getBytes)
+                case Incompatible(None) =>
+                  w.write(s"| $fromTypeName | $toTypeName | Incompatible | 
|\n".getBytes)
+                case Unsupported =>
+                  w.write(s"| $fromTypeName | $toTypeName | Unsupported | 
|\n".getBytes)
+              }
+            }
+          }
+        }
+      } else {
+        w.write(s"${line.trim}\n".getBytes)
+      }
+    }
+    w.close()
+  }
+}
diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala 
b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala
new file mode 100644
index 00000000..5641c94a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.expressions
+
+import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType}
+
+sealed trait SupportLevel
+
+/** We support this feature with full compatibility with Spark */
+object Compatible extends SupportLevel
+
+/** We support this feature but results can be different from Spark */
+case class Incompatible(reason: Option[String] = None) extends SupportLevel
+
+/** We do not support this feature */
+object Unsupported extends SupportLevel
+
+object CometCast {
+
+  def supportedTypes: Seq[DataType] =
+    Seq(
+      DataTypes.BooleanType,
+      DataTypes.ByteType,
+      DataTypes.ShortType,
+      DataTypes.IntegerType,
+      DataTypes.LongType,
+      DataTypes.FloatType,
+      DataTypes.DoubleType,
+      DataTypes.createDecimalType(10, 2),
+      DataTypes.StringType,
+      DataTypes.BinaryType,
+      DataTypes.DateType,
+      DataTypes.TimestampType)
+  // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later
+  // https://github.com/apache/datafusion-comet/issues/378
+
+  def isSupported(
+      fromType: DataType,
+      toType: DataType,
+      timeZoneId: Option[String],
+      evalMode: String): SupportLevel = {
+
+    if (fromType == toType) {
+      return Compatible
+    }
+
+    (fromType, toType) match {
+      case (dt: DataType, _) if dt.typeName == "timestamp_ntz" =>
+        // https://github.com/apache/datafusion-comet/issues/378
+        toType match {
+          case DataTypes.TimestampType | DataTypes.DateType | 
DataTypes.StringType =>
+            Incompatible()
+          case _ =>
+            Unsupported
+        }
+      case (_: DecimalType, _: DecimalType) =>
+        // https://github.com/apache/datafusion-comet/issues/375
+        Incompatible()
+      case (DataTypes.StringType, _) =>
+        canCastFromString(toType, timeZoneId, evalMode)
+      case (_, DataTypes.StringType) =>
+        canCastToString(fromType)
+      case (DataTypes.TimestampType, _) =>
+        canCastFromTimestamp(toType)
+      case (_: DecimalType, _) =>
+        canCastFromDecimal(toType)
+      case (DataTypes.BooleanType, _) =>
+        canCastFromBoolean(toType)
+      case (
+            DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | 
DataTypes.LongType,
+            _) =>
+        canCastFromInt(toType)
+      case (DataTypes.FloatType, _) =>
+        canCastFromFloat(toType)
+      case (DataTypes.DoubleType, _) =>
+        canCastFromDouble(toType)
+      case _ => Unsupported
+    }
+  }
+
+  private def canCastFromString(
+      toType: DataType,
+      timeZoneId: Option[String],
+      evalMode: String): SupportLevel = {
+    toType match {
+      case DataTypes.BooleanType =>
+        Compatible
+      case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType |
+          DataTypes.LongType =>
+        Compatible
+      case DataTypes.BinaryType =>
+        Compatible
+      case DataTypes.FloatType | DataTypes.DoubleType =>
+        // https://github.com/apache/datafusion-comet/issues/326
+        Unsupported
+      case _: DecimalType =>
+        // https://github.com/apache/datafusion-comet/issues/325
+        Unsupported
+      case DataTypes.DateType =>
+        // https://github.com/apache/datafusion-comet/issues/327
+        Unsupported
+      case DataTypes.TimestampType if timeZoneId.exists(tz => tz != "UTC") =>
+        Incompatible(Some(s"Cast will use UTC instead of $timeZoneId"))
+      case DataTypes.TimestampType if evalMode == "ANSI" =>
+        Incompatible(Some("ANSI mode not supported"))
+      case DataTypes.TimestampType =>
+        // https://github.com/apache/datafusion-comet/issues/328
+        Incompatible(Some("Not all valid formats are supported"))
+      case _ =>
+        Unsupported
+    }
+  }
+
+  private def canCastToString(fromType: DataType): SupportLevel = {
+    fromType match {
+      case DataTypes.BooleanType => Compatible
+      case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType |
+          DataTypes.LongType =>
+        Compatible
+      case DataTypes.DateType => Compatible
+      case DataTypes.TimestampType => Compatible
+      case DataTypes.FloatType | DataTypes.DoubleType =>
+        // https://github.com/apache/datafusion-comet/issues/326
+        Incompatible()
+      case DataTypes.BinaryType =>
+        // https://github.com/apache/datafusion-comet/issues/377
+        Incompatible()
+      case _ => Unsupported
+    }
+  }
+
+  private def canCastFromTimestamp(toType: DataType): SupportLevel = {
+    toType match {
+      case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
+          DataTypes.IntegerType =>
+        // https://github.com/apache/datafusion-comet/issues/352
+        // this seems like an edge case that isn't important for us to support
+        Unsupported
+      case DataTypes.LongType =>
+        // https://github.com/apache/datafusion-comet/issues/352
+        Compatible
+      case DataTypes.StringType => Compatible
+      case DataTypes.DateType => Compatible
+      case _ => Unsupported
+    }
+  }
+
+  private def canCastFromBoolean(toType: DataType): SupportLevel = toType 
match {
+    case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | 
DataTypes.LongType |
+        DataTypes.FloatType | DataTypes.DoubleType =>
+      Compatible
+    case _ => Unsupported
+  }
+
+  private def canCastFromInt(toType: DataType): SupportLevel = toType match {
+    case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
+        DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | 
DataTypes.DoubleType |
+        _: DecimalType =>
+      Compatible
+    case _ => Unsupported
+  }
+
+  private def canCastFromFloat(toType: DataType): SupportLevel = toType match {
+    case DataTypes.BooleanType | DataTypes.DoubleType => Compatible
+    case _ => Unsupported
+  }
+
+  private def canCastFromDouble(toType: DataType): SupportLevel = toType match 
{
+    case DataTypes.BooleanType | DataTypes.FloatType => Compatible
+    case _: DecimalType => Incompatible()
+    case _ => Unsupported
+  }
+
+  private def canCastFromDecimal(toType: DataType): SupportLevel = toType 
match {
+    case DataTypes.FloatType | DataTypes.DoubleType => Compatible
+    case _ => Unsupported
+  }
+
+}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index c07b2b3c..e77adc9b 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -43,6 +43,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.comet.CometConf
 import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, 
isCometScan, isSpark32, isSpark34Plus, withInfo}
+import org.apache.comet.expressions.{CometCast, Compatible, Incompatible, 
Unsupported}
 import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => 
ProtoDataType, Expr, ScalarFunc}
 import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, 
DecimalInfo, ListInfo, MapInfo, StructInfo}
 import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => 
CometAggregateMode, JoinType, Operator}
@@ -585,30 +586,35 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
               evalMode.toString
             }
-
-            val supportedTimezone = (child.dataType, dt) match {
-              case (DataTypes.StringType, DataTypes.TimestampType)
-                  if !timeZoneId.contains("UTC") =>
-                withInfo(expr, s"Unsupported timezone ${timeZoneId} for 
timestamp cast")
-                false
-              case _ => true
-            }
-
-            val supportedCast = (child.dataType, dt) match {
-              case (DataTypes.StringType, DataTypes.TimestampType)
-                  if !CometConf.COMET_CAST_STRING_TO_TIMESTAMP.get() =>
-                // https://github.com/apache/datafusion-comet/issues/328
-                withInfo(expr, 
s"${CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key} is disabled")
-                false
-              case _ => true
-            }
-
-            if (supportedCast && supportedTimezone) {
-              castToProto(timeZoneId, dt, childExpr, evalModeStr)
-            } else {
-              // no need to call withInfo here since it was called when 
determining
-              // the value for `supportedCast`
-              None
+            val castSupport =
+              CometCast.isSupported(child.dataType, dt, timeZoneId, 
evalModeStr)
+
+            def getIncompatMessage(reason: Option[String]) =
+              "Comet does not guarantee correct results for cast " +
+                s"from ${child.dataType} to $dt " +
+                s"with timezone $timeZoneId and evalMode $evalModeStr" +
+                reason.map(str => s" ($str)").getOrElse("")
+
+            castSupport match {
+              case Compatible =>
+                castToProto(timeZoneId, dt, childExpr, evalModeStr)
+              case Incompatible(reason) =>
+                if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
+                  logWarning(getIncompatMessage(reason))
+                  castToProto(timeZoneId, dt, childExpr, evalModeStr)
+                } else {
+                  withInfo(
+                    expr,
+                    s"${getIncompatMessage(reason)}. To enable all 
incompatible casts, set " +
+                      s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
+                  None
+                }
+              case Unsupported =>
+                withInfo(
+                  expr,
+                  s"Unsupported cast from ${child.dataType} to $dt " +
+                    s"with timezone $timeZoneId and evalMode $evalModeStr")
+                None
             }
           } else {
             withInfo(expr, child)
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 3be7dcb6..54b13679 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, DataTypes}
 
+import org.apache.comet.expressions.CometCast
+
 class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
   import testImplicits._
 
@@ -72,22 +74,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       }
     }
 
-    // make sure we have tests for all combinations of our supported types
-    val supportedTypes =
-      Seq(
-        DataTypes.BooleanType,
-        DataTypes.ByteType,
-        DataTypes.ShortType,
-        DataTypes.IntegerType,
-        DataTypes.LongType,
-        DataTypes.FloatType,
-        DataTypes.DoubleType,
-        DataTypes.createDecimalType(10, 2),
-        DataTypes.StringType,
-        DataTypes.DateType,
-        DataTypes.TimestampType)
-    // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later
-    assertTestsExist(supportedTypes, supportedTypes)
+    assertTestsExist(CometCast.supportedTypes, CometCast.supportedTypes)
   }
 
   // CAST from BooleanType
@@ -164,6 +151,10 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     castTest(generateBytes(), DataTypes.StringType)
   }
 
+  ignore("cast ByteType to BinaryType") {
+    castTest(generateBytes(), DataTypes.BinaryType)
+  }
+
   ignore("cast ByteType to TimestampType") {
     // input: -1, expected: 1969-12-31 15:59:59.0, actual: 1969-12-31 
15:59:59.999999
     castTest(generateBytes(), DataTypes.TimestampType)
@@ -204,6 +195,10 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     castTest(generateShorts(), DataTypes.StringType)
   }
 
+  ignore("cast ShortType to BinaryType") {
+    castTest(generateShorts(), DataTypes.BinaryType)
+  }
+
   ignore("cast ShortType to TimestampType") {
     // input: -1003, expected: 1969-12-31 15:43:17.0, actual: 1969-12-31 
15:59:59.998997
     castTest(generateShorts(), DataTypes.TimestampType)
@@ -246,6 +241,10 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     castTest(generateInts(), DataTypes.StringType)
   }
 
+  ignore("cast IntegerType to BinaryType") {
+    castTest(generateInts(), DataTypes.BinaryType)
+  }
+
   ignore("cast IntegerType to TimestampType") {
     // input: -1000479329, expected: 1938-04-19 01:04:31.0, actual: 1969-12-31 
15:43:19.520671
     castTest(generateInts(), DataTypes.TimestampType)
@@ -289,6 +288,10 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     castTest(generateLongs(), DataTypes.StringType)
   }
 
+  ignore("cast LongType to BinaryType") {
+    castTest(generateLongs(), DataTypes.BinaryType)
+  }
+
   ignore("cast LongType to TimestampType") {
     // java.lang.ArithmeticException: long overflow
     castTest(generateLongs(), DataTypes.TimestampType)
@@ -537,23 +540,47 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     castTest(values, DataTypes.createDecimalType(10, 2))
   }
 
+  test("cast StringType to BinaryType") {
+    castTest(generateStrings(numericPattern, 8).toDF("a"), 
DataTypes.BinaryType)
+  }
+
   ignore("cast StringType to DateType") {
     // https://github.com/apache/datafusion-comet/issues/327
     castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DateType)
   }
 
   test("cast StringType to TimestampType disabled by default") {
-    val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a")
-    castFallbackTest(
-      values.toDF("a"),
-      DataTypes.TimestampType,
-      "spark.comet.cast.stringToTimestamp is disabled")
+    withSQLConf((SQLConf.SESSION_LOCAL_TIMEZONE.key, "UTC")) {
+      val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a")
+      castFallbackTest(
+        values.toDF("a"),
+        DataTypes.TimestampType,
+        "Not all valid formats are supported")
+    }
+  }
+
+  test("cast StringType to TimestampType disabled for non-UTC timezone") {
+    withSQLConf((SQLConf.SESSION_LOCAL_TIMEZONE.key, "America/Denver")) {
+      val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a")
+      castFallbackTest(
+        values.toDF("a"),
+        DataTypes.TimestampType,
+        "Cast will use UTC instead of Some(America/Denver)")
+    }
+  }
+
+  ignore("cast StringType to TimestampType (fuzz test)") {
+    // https://github.com/apache/datafusion-comet/issues/328
+    withSQLConf((CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key, "true")) {
+      val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ 
generateStrings(timestampPattern, 8)
+      castTest(values.toDF("a"), DataTypes.TimestampType)
+    }
   }
 
   test("cast StringType to TimestampType") {
     withSQLConf(
       SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
-      CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key -> "true") {
+      CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       val values = Seq(
         "2020",
         "2020-01",
@@ -570,15 +597,17 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     // test for invalid inputs
     withSQLConf(
       SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
-      CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key -> "true") {
+      CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       val values = Seq("-9?", "1-", "0.5")
       castTimestampTest(values.toDF("a"), DataTypes.TimestampType)
     }
   }
 
-  test("cast StringType to TimestampType with invalid timezone") {
-    val values = Seq("2020-01-01T12:34:56.123456", "T2")
-    castFallbackTestTimezone(values.toDF("a"), DataTypes.TimestampType, 
"Unsupported timezone")
+  // CAST from BinaryType
+
+  ignore("cast BinaryType to StringType") {
+    // TODO implement this
+    // https://github.com/apache/datafusion-comet/issues/377
   }
 
   // CAST from DateType
@@ -657,9 +686,8 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     castTest(generateTimestamps(), DataTypes.IntegerType)
   }
 
-  ignore("cast TimestampType to LongType") {
-    // https://github.com/apache/datafusion-comet/issues/352
-    // input: 2023-12-31 17:00:00.0, expected: 1.70407078E9, actual: 
1.70407082E15]
+  test("cast TimestampType to LongType") {
+    assume(CometSparkSessionExtensions.isSpark33Plus)
     castTest(generateTimestamps(), DataTypes.LongType)
   }
 
@@ -818,7 +846,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
       withSQLConf(
         (SQLConf.ANSI_ENABLED.key, "false"),
-        (CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key, "true"),
+        (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key, "true"),
         (SQLConf.SESSION_LOCAL_TIMEZONE.key, "America/Los_Angeles")) {
         val df = data.withColumn("converted", col("a").cast(toType))
         df.collect()
@@ -847,6 +875,11 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   private def castTest(input: DataFrame, toType: DataType): Unit = {
+
+    // we do not support the TryCast expression in Spark 3.2 and 3.3
+    // https://github.com/apache/datafusion-comet/issues/374
+    val testTryCast = CometSparkSessionExtensions.isSpark34Plus
+
     withTempPath { dir =>
       val data = roundtripParquet(input, dir).coalesce(1)
       data.createOrReplaceTempView("t")
@@ -854,12 +887,14 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) {
         // cast() should return null for invalid inputs when ansi mode is 
disabled
         val df = spark.sql(s"select a, cast(a as ${toType.sql}) from t order 
by a")
-        checkSparkAnswer(df)
+        checkSparkAnswerAndOperator(df)
 
         // try_cast() should always return null for invalid inputs
-        val df2 =
-          spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by 
a")
-        checkSparkAnswer(df2)
+        if (testTryCast) {
+          val df2 =
+            spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by 
a")
+          checkSparkAnswerAndOperator(df2)
+        }
       }
 
       // with ANSI enabled, we should produce the same exception as Spark
@@ -899,9 +934,11 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
         }
 
         // try_cast() should always return null for invalid inputs
-        val df2 =
-          spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by 
a")
-        checkSparkAnswer(df2)
+        if (testTryCast) {
+          val df2 =
+            spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by 
a")
+          checkSparkAnswerAndOperator(df2)
+        }
       }
     }
   }
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 3683c8d4..c8c7ffd5 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -259,7 +259,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("cast timestamp and timestamp_ntz") {
-    withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") {
+    withSQLConf(
+      SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
+      CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       Seq(true, false).foreach { dictionaryEnabled =>
         withTempDir { dir =>
           val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
@@ -282,7 +284,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     // TODO: make the test pass for Spark 3.2 & 3.3
     assume(isSpark34Plus)
 
-    withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") {
+    withSQLConf(
+      SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
+      CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       Seq(true, false).foreach { dictionaryEnabled =>
         withTempDir { dir =>
           val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
@@ -305,7 +309,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     // TODO: make the test pass for Spark 3.2 & 3.3
     assume(isSpark34Plus)
 
-    withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") {
+    withSQLConf(
+      SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
+      CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       Seq(true, false).foreach { dictionaryEnabled =>
         withTempDir { dir =>
           val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
@@ -394,32 +400,34 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("date_trunc with timestamp_ntz") {
     assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect 
behavior in 3.2")
-    Seq(true, false).foreach { dictionaryEnabled =>
-      withTempDir { dir =>
-        val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
-        makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 
10000)
-        withParquetTable(path.toString, "timetbl") {
-          Seq(
-            "YEAR",
-            "YYYY",
-            "YY",
-            "MON",
-            "MONTH",
-            "MM",
-            "QUARTER",
-            "WEEK",
-            "DAY",
-            "DD",
-            "HOUR",
-            "MINUTE",
-            "SECOND",
-            "MILLISECOND",
-            "MICROSECOND").foreach { format =>
-            checkSparkAnswerAndOperator(
-              "SELECT " +
-                s"date_trunc('$format', _3), " +
-                s"date_trunc('$format', _5)  " +
-                " from timetbl")
+    withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
+      Seq(true, false).foreach { dictionaryEnabled =>
+        withTempDir { dir =>
+          val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet")
+          makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 
10000)
+          withParquetTable(path.toString, "timetbl") {
+            Seq(
+              "YEAR",
+              "YYYY",
+              "YY",
+              "MON",
+              "MONTH",
+              "MM",
+              "QUARTER",
+              "WEEK",
+              "DAY",
+              "DD",
+              "HOUR",
+              "MINUTE",
+              "SECOND",
+              "MILLISECOND",
+              "MICROSECOND").foreach { format =>
+              checkSparkAnswerAndOperator(
+                "SELECT " +
+                  s"date_trunc('$format', _3), " +
+                  s"date_trunc('$format', _5)  " +
+                  " from timetbl")
+            }
           }
         }
       }
@@ -428,22 +436,24 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("date_trunc with format array") {
     assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See 
SPARK-36182")
-    val numRows = 1000
-    Seq(true, false).foreach { dictionaryEnabled =>
-      withTempDir { dir =>
-        val path = new Path(dir.toURI.toString, 
"timestamp_trunc_with_format.parquet")
-        makeDateTimeWithFormatTable(path, dictionaryEnabled = 
dictionaryEnabled, numRows)
-        withParquetTable(path.toString, "timeformattbl") {
-          checkSparkAnswerAndOperator(
-            "SELECT " +
-              "format, _0, _1, _2, _3, _4, _5, " +
-              "date_trunc(format, _0), " +
-              "date_trunc(format, _1), " +
-              "date_trunc(format, _2), " +
-              "date_trunc(format, _3), " +
-              "date_trunc(format, _4), " +
-              "date_trunc(format, _5) " +
-              " from timeformattbl ")
+    withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
+      val numRows = 1000
+      Seq(true, false).foreach { dictionaryEnabled =>
+        withTempDir { dir =>
+          val path = new Path(dir.toURI.toString, 
"timestamp_trunc_with_format.parquet")
+          makeDateTimeWithFormatTable(path, dictionaryEnabled = 
dictionaryEnabled, numRows)
+          withParquetTable(path.toString, "timeformattbl") {
+            checkSparkAnswerAndOperator(
+              "SELECT " +
+                "format, _0, _1, _2, _3, _4, _5, " +
+                "date_trunc(format, _0), " +
+                "date_trunc(format, _1), " +
+                "date_trunc(format, _2), " +
+                "date_trunc(format, _3), " +
+                "date_trunc(format, _4), " +
+                "date_trunc(format, _5) " +
+                " from timeformattbl ")
+          }
         }
       }
     }
@@ -818,7 +828,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("ceil and floor") {
     Seq("true", "false").foreach { dictionary =>
-      withSQLConf("parquet.enable.dictionary" -> dictionary) {
+      withSQLConf(
+        "parquet.enable.dictionary" -> dictionary,
+        CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
         withParquetTable(
           (-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)),
           "tbl",
@@ -1406,7 +1418,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("hash functions") {
     Seq(true, false).foreach { dictionary =>
-      withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
+      withSQLConf(
+        "parquet.enable.dictionary" -> dictionary.toString,
+        CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
         val table = "test"
         withTable(table) {
           sql(s"create table $table(col string, a int, b float) using parquet")
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index bd4042ec..fc6876fd 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -863,7 +863,8 @@ class CometAggregateSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       Seq(true, false).foreach { nativeShuffleEnabled =>
         withSQLConf(
           CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> 
nativeShuffleEnabled.toString,
-          CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
+          CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
+          CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
           withTempDir { dir =>
             val path = new Path(dir.toURI.toString, "test")
             makeParquetFile(path, 1000, 20, dictionaryEnabled)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index f3718356..bfde1403 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -949,7 +949,9 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("SPARK-33474: Support typed literals as partition spec values") {
-    withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") {
+    withSQLConf(
+      SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
+      CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       withTable("t1") {
         val binaryStr = "Spark SQL"
         val binaryHexStr = 
Hex.hex(UTF8String.fromString(binaryStr).getBytes).toString
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 48969ea4..90ea7947 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -261,6 +261,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
       CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
       CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
       CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true",
+      CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for 
v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
       "spark.sql.readSideCharPadding" -> "false",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
       val qe = sql(queryString).queryExecution


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to