comphead commented on issue #2667:
URL: 
https://github.com/apache/datafusion-comet/issues/2667#issuecomment-3483326399

   Localized the issue with test
   
   ```
   test("TPCDS Q69 correctness") {
   
       setupTPC_DS()
   
       withSQLConf(
         CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
         "spark.comet.cast.allowIncompatible" -> "true",
         "spark.comet.enabled" -> "true",
         "spark.comet.exec.replaceSortMergeJoin" -> "true",
         "spark.comet.exec.shuffle.enableFastEncoding" -> "true",
         "spark.comet.exec.shuffle.enabled" -> "true",
         "spark.comet.exec.shuffle.fallbackToColumnar" -> "true",
         "spark.comet.explain.native.enabled" -> "true",
         "spark.comet.explainFallback.enabled" -> "true",
         "spark.sql.adaptive.enabled" -> "false",
         "conf spark.comet.scan.impl" -> "native_iceberg_compat",
         "spark.comet.batchSize" -> "32768",
         "spark.comet.columnar.shuffle.batch.size" -> "32768",
         "spark.shuffle.manager" -> 
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager",
         "spark.comet.logFallbackReasons.enabled" -> "true") {
   
         val df = spark
           .sql("""
                  |select
                  |  cd_gender,
                  |  cd_marital_status,
                  |  cd_education_status,
                  |  count(*) cnt1,
                  |  cd_purchase_estimate,
                  |  count(*) cnt2,
                  |  cd_credit_rating,
                  |  count(*) cnt3
                  | from
                  |  customer c,customer_address ca,customer_demographics
                  | where
                  |  c.c_current_addr_sk = ca.ca_address_sk and
                  |  ca_state in ('IN','VA','MS') and
                  |  cd_demo_sk = c.c_current_cdemo_sk and
                  |  exists (select *
                  |          from store_sales,date_dim
                  |          where c.c_customer_sk = ss_customer_sk and
                  |                ss_sold_date_sk = d_date_sk and
                  |                d_year = 2002 and
                  |                d_moy between 2 and 2+2)   and (not exists 
(select *
                  |            from web_sales,date_dim
                  |            where c.c_customer_sk = ws_bill_customer_sk and
                  |                  ws_sold_date_sk = d_date_sk and
                  |                  d_year = 2002 and
                  |                  d_moy between 2 and 2+2))
                  | group by cd_gender,
                  |          cd_marital_status,
                  |          cd_education_status,
                  |          cd_purchase_estimate,
                  |          cd_credit_rating
                  |""".stripMargin)
   
           df.orderBy(df.columns.map(col) : _*).show(false)
       }
   ```
   
   Comet output is wrong when
   ```
         "spark.comet.exec.replaceSortMergeJoin" -> "true",
         "spark.sql.adaptive.enabled" -> "false",
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to