kazuyukitanimura commented on code in PR #1415:
URL: https://github.com/apache/datafusion-comet/pull/1415#discussion_r1962234098


##########
spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala:
##########
@@ -97,13 +102,19 @@ abstract class ParquetReadSuite extends CometTestBase {
       StructType(
         Seq(
           StructField("f1", DecimalType.SYSTEM_DEFAULT),
-          StructField("f2", StringType))) -> false,
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = LongType, valueType = DateType) -> false,
-      StructType(Seq(StructField("f1", ByteType), StructField("f2", 
StringType))) -> false,
+      StructType(
+        Seq(
+          StructField("f1", ByteType),
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = IntegerType, valueType = BinaryType) -> false).foreach 
{
       case (dt, expected) =>
         assert(CometScanExec.isTypeSupported(dt) == expected)
-        assert(CometBatchScanExec.isTypeSupported(dt) == expected)
+        // usingDataFusionParquetExec does not support CometBatchScanExec yet
+        if (!usingDataFusionParquetExec(conf)) {

Review Comment:
   > usingDataFusionParquetExec does not support CometBatchScanExec yet
   
   I assume this means we are not supporting V2 yet. But
   `CometScanExec.isTypeSupported` == `CometBatchScanExec.isTypeSupported` so I 
guess we can still pass this test regardless `usingDataFusionParquetExec`?



##########
spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala:
##########
@@ -97,13 +102,19 @@ abstract class ParquetReadSuite extends CometTestBase {
       StructType(
         Seq(
           StructField("f1", DecimalType.SYSTEM_DEFAULT),
-          StructField("f2", StringType))) -> false,
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = LongType, valueType = DateType) -> false,
-      StructType(Seq(StructField("f1", ByteType), StructField("f2", 
StringType))) -> false,
+      StructType(
+        Seq(
+          StructField("f1", ByteType),
+          StructField("f2", StringType))) -> usingNativeIcebergCompat,
       MapType(keyType = IntegerType, valueType = BinaryType) -> false).foreach 
{

Review Comment:
   Just checking, looks like `MapType` is not supported yet?



##########
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala:
##########
@@ -117,7 +117,9 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
-        makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
10000)
+        withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
+          makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
10000)
+        }

Review Comment:
   Do we use Spark (Comet) for `makeParquetFileAllTypes`? If so should we add 
`CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false"` inside 
`makeParquetFileAllTypes` so that other test cases also avoids Comet produced 
test cases



##########
spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala:
##########
@@ -740,28 +740,48 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
         makeParquetFileAllTypes(path, false, 10000, 10010)
-
-        Seq(
-          $"_1",
-          $"_2",
-          $"_3",
-          $"_4",
-          $"_5",
-          $"_6",
-          $"_7",
-          $"_8",
-          $"_9",
-          $"_10",
-          $"_11",
-          $"_12",
-          $"_13",
-          $"_14",
-          $"_15",
-          $"_16",
-          $"_17",
-          $"_18",
-          $"_19",
-          $"_20").foreach { col =>
+        val fieldsToTest = if 
(CometSparkSessionExtensions.usingDataFusionParquetExec(conf)) {
+          Seq(
+            $"_1",
+            $"_4",
+            $"_5",
+            $"_6",
+            $"_7",
+            $"_8",
+            $"_11",
+            $"_12",
+            $"_13",
+            $"_14",
+            $"_15",
+            $"_16",
+            $"_17",
+            $"_18",
+            $"_19",
+            $"_20")

Review Comment:
   We are skipping 2, 3, 9, and 10, I guess it is due to unsupported uints
   Perhaps we can put TODO



##########
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala:
##########
@@ -1273,32 +1275,36 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
           -128,
           128,
           randomSize = 100)
-        withParquetTable(path.toString, "tbl") {
-          for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15, 
15, -16, 16, null)) {
-            // array tests
-            // TODO: enable test for floats (_6, _7, _8, _13)
-            for (c <- Seq(2, 3, 4, 5, 9, 10, 11, 12, 15, 16, 17)) {
-              checkSparkAnswerAndOperator(s"select _${c}, round(_${c}, ${s}) 
FROM tbl")
-            }
-            // scalar tests
-            // Exclude the constant folding optimizer in order to actually 
execute the native round
-            // operations for scalar (literal) values.
-            // TODO: comment in the tests for float once supported
-            withSQLConf(
-              "spark.sql.optimizer.excludedRules" -> 
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
-              for (n <- Seq("0.0", "-0.0", "0.5", "-0.5", "1.2", "-1.2")) {
-                checkSparkAnswerAndOperator(s"select round(cast(${n} as 
tinyint), ${s}) FROM tbl")
-                // checkSparkAnswerAndCometOperators(s"select round(cast(${n} 
as float), ${s}) FROM tbl")
-                checkSparkAnswerAndOperator(
-                  s"select round(cast(${n} as decimal(38, 18)), ${s}) FROM 
tbl")
-                checkSparkAnswerAndOperator(
-                  s"select round(cast(${n} as decimal(20, 0)), ${s}) FROM tbl")
+        withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {

Review Comment:
   nit: is this related?



##########
spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala:
##########
@@ -71,14 +71,26 @@ case class CometParquetPartitionReaderFactory(
   // Comet specific configurations
   private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
 
+  @transient private lazy val usingDataFusionReader: Boolean = {
+    val conf = broadcastedConf.value.value
+    conf.getBoolean(
+      CometConf.COMET_NATIVE_SCAN_ENABLED.key,
+      CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
+    conf
+      .get(
+        CometConf.COMET_NATIVE_SCAN_IMPL.key,
+        CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
+      .equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
+  }
   // This is only called at executor on a Broadcast variable, so we don't want 
it to be
   // materialized at driver.
   @transient private lazy val preFetchEnabled = {
     val conf = broadcastedConf.value.value
 
     conf.getBoolean(
       CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
-      CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get)
+      CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
+    !usingDataFusionReader // Turn off prefetch if native_iceberg_compat is 
enabled

Review Comment:
   Just checking if we need to do the same for `SCAN_NATIVE_DATAFUSION`?



##########
spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala:
##########
@@ -1001,7 +1012,7 @@ abstract class ParquetReadSuite extends CometTestBase {
                 Seq(StructField("_1", LongType, false), StructField("_2", 
DoubleType, false)))
 
             withParquetDataFrame(data, schema = Some(readSchema)) { df =>
-              if (enableSchemaEvolution) {
+              if (enableSchemaEvolution || usingDataFusionParquetExec(conf)) {

Review Comment:
   Hmm does this mean that `usingDataFusionParquetExec==true` we always use 
schema evolution regardless `COMET_SCHEMA_EVOLUTION_ENABLED` This means it will 
be incompatible with Spark 3.x?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to