parthchandra commented on code in PR #1415:
URL: https://github.com/apache/datafusion-comet/pull/1415#discussion_r1962790306


##########
spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala:
##########
@@ -71,14 +71,26 @@ case class CometParquetPartitionReaderFactory(
   // Comet specific configurations
   private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
 
+  @transient private lazy val usingDataFusionReader: Boolean = {
+    val conf = broadcastedConf.value.value
+    conf.getBoolean(
+      CometConf.COMET_NATIVE_SCAN_ENABLED.key,
+      CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
+    conf
+      .get(
+        CometConf.COMET_NATIVE_SCAN_IMPL.key,
+        CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
+      .equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+  }
   // This is only called at executor on a Broadcast variable, so we don't want 
it to be
   // materialized at driver.
   @transient private lazy val preFetchEnabled = {
     val conf = broadcastedConf.value.value
 
     conf.getBoolean(
       CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
-      CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get)
+      CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
+    !usingDataFusionReader // Turn off prefetch if native_iceberg_compat is 
enabled

Review Comment:
   No we do not. Prefetch is supported only by `NATIVE_COMET`. 
`NATIVE_ICEBERG_COMPAT` shares code from CometScan with `NATIVE_COMET` so we 
need to preclude this from the tests. For `NATIVE_DATAFUSION` the conf is never 
used and has no effect.



##########
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala:
##########
@@ -1273,32 +1275,36 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
           -128,
           128,
           randomSize = 100)
-        withParquetTable(path.toString, "tbl") {
-          for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15, 
15, -16, 16, null)) {
-            // array tests
-            // TODO: enable test for floats (_6, _7, _8, _13)
-            for (c <- Seq(2, 3, 4, 5, 9, 10, 11, 12, 15, 16, 17)) {
-              checkSparkAnswerAndOperator(s"select _${c}, round(_${c}, ${s}) 
FROM tbl")
-            }
-            // scalar tests
-            // Exclude the constant folding optimizer in order to actually 
execute the native round
-            // operations for scalar (literal) values.
-            // TODO: comment in the tests for float once supported
-            withSQLConf(
-              "spark.sql.optimizer.excludedRules" -> 
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
-              for (n <- Seq("0.0", "-0.0", "0.5", "-0.5", "1.2", "-1.2")) {
-                checkSparkAnswerAndOperator(s"select round(cast(${n} as 
tinyint), ${s}) FROM tbl")
-                // checkSparkAnswerAndCometOperators(s"select round(cast(${n} 
as float), ${s}) FROM tbl")
-                checkSparkAnswerAndOperator(
-                  s"select round(cast(${n} as decimal(38, 18)), ${s}) FROM 
tbl")
-                checkSparkAnswerAndOperator(
-                  s"select round(cast(${n} as decimal(20, 0)), ${s}) FROM tbl")
+        withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {

Review Comment:
   Yes. For tests, if ALLOW_INCOMPATIBLE is not set we fallback to Spark if 
there is a ByteType or ShortType in the data (Spark will not distinguish 
between signed and unsigned). In that case tests which call 
`checkSparkAnswerAndOperator` all fail because the expectation is that the plan 
will have only Comet operators. 



##########
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala:
##########
@@ -117,7 +117,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
-        makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
10000)
+        withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
+          makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
10000)
+        }

Review Comment:
   We did make the change in `makeParquetFileAllTypes` in a previous PR. This 
PR addresses the test cases that still failed. 



##########
spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala:
##########
@@ -740,28 +740,48 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
         makeParquetFileAllTypes(path, false, 10000, 10010)
-
-        Seq(
-          $"_1",
-          $"_2",
-          $"_3",
-          $"_4",
-          $"_5",
-          $"_6",
-          $"_7",
-          $"_8",
-          $"_9",
-          $"_10",
-          $"_11",
-          $"_12",
-          $"_13",
-          $"_14",
-          $"_15",
-          $"_16",
-          $"_17",
-          $"_18",
-          $"_19",
-          $"_20").foreach { col =>
+        val fieldsToTest = if 
(CometSparkSessionExtensions.usingDataFusionParquetExec(conf)) {
+          Seq(
+            $"_1",
+            $"_4",
+            $"_5",
+            $"_6",
+            $"_7",
+            $"_8",
+            $"_11",
+            $"_12",
+            $"_13",
+            $"_14",
+            $"_15",
+            $"_16",
+            $"_17",
+            $"_18",
+            $"_19",
+            $"_20")

Review Comment:
   We are skipping ByteType and ShortType (both signed and unsigned). I doubt 
if the TODO will get addressed (it appears that this is unlikely to be 
addressed in arrow-rs), but I've added it.



##########
spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala:
##########
@@ -97,13 +102,19 @@ abstract class ParquetReadSuite extends CometTestBase {
       StructType(
         Seq(
           StructField("f1", DecimalType.SYSTEM_DEFAULT),
-          StructField("f2", StringType))) -> false,
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = LongType, valueType = DateType) -> false,
-      StructType(Seq(StructField("f1", ByteType), StructField("f2", 
StringType))) -> false,
+      StructType(
+        Seq(
+          StructField("f1", ByteType),
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = IntegerType, valueType = BinaryType) -> false).foreach 
{
       case (dt, expected) =>
         assert(CometScanExec.isTypeSupported(dt) == expected)
-        assert(CometBatchScanExec.isTypeSupported(dt) == expected)
+        // usingDataFusionParquetExec does not support CometBatchScanExec yet
+        if (!usingDataFusionParquetExec(conf)) {

Review Comment:
   `CometScanExec.isTypeSupported != CometBatchScanExec.isTypeSupported` 
because 
   `isAdditionallySupported` is overridden in `CometScanExec` and 
`CometNativeScan`.



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -260,7 +260,8 @@ public void init() throws URISyntaxException, IOException {
     } ////// End get requested schema
 
     String timeZoneId = conf.get("spark.sql.session.timeZone");

Review Comment:
   We need be using the session timezone parameter for at lest some conversions 
(especially those involving timezone_ntz). The conversions of 
timestamp/timestamp_ntz between Arrow and Spark are somewhat convoluted and 
sometimes require the timezone offset to be applied to the values to make them 
consistent. 
   Safer to make sure the timezone parameter is passed to the native side so it 
can be applied when necessary.



##########
spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala:
##########
@@ -1001,7 +1012,7 @@ abstract class ParquetReadSuite extends CometTestBase {
                 Seq(StructField("_1", LongType, false), StructField("_2", 
DoubleType, false)))
 
             withParquetDataFrame(data, schema = Some(readSchema)) { df =>
-              if (enableSchemaEvolution) {
+              if (enableSchemaEvolution || usingDataFusionParquetExec(conf)) {

Review Comment:
   I suspect it may be incompatible. We will address this as we get to the 
point where we are able to run Spark tests with different versions of Spark. At 
the moment we are still clearing up the Comet test failures. 



##########
spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala:
##########
@@ -97,13 +102,19 @@ abstract class ParquetReadSuite extends CometTestBase {
       StructType(
         Seq(
           StructField("f1", DecimalType.SYSTEM_DEFAULT),
-          StructField("f2", StringType))) -> false,
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = LongType, valueType = DateType) -> false,
-      StructType(Seq(StructField("f1", ByteType), StructField("f2", 
StringType))) -> false,
+      StructType(
+        Seq(
+          StructField("f1", ByteType),
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = IntegerType, valueType = BinaryType) -> false).foreach 
{

Review Comment:
   Correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to