comphead commented on issue #4718:
URL: 
https://github.com/apache/datafusion-comet/issues/4718#issuecomment-4815169522

   I was able to have local repro 
   
   
   ```
   test(
       "CUBE(9) + COUNT(DISTINCT) wide Utf8 keys with unbounded pool: offset 
overflow buffer > i32::MAX") {
       import org.apache.spark.sql.functions.{count_distinct, grouping}
       val numRows = 30000L
       val wideBytes = 384
       val userPadLen = wideBytes - "user-".length
       val measurePadLen = wideBytes - "meas-".length
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "events").toString
         // --- materialize to Parquet (Comet OFF so the writer is plain Spark) 
---
         // dictionary disabled so Comet's scan emits full-width Utf8 values 
into the
         // group-key accumulator (dictionary-encoded scan would only push dict 
keys).
         withSQLConf(
           SQLConf.SHUFFLE_PARTITIONS.key -> "1",
           SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
           "parquet.enable.dictionary" -> "false",
           CometConf.COMET_ENABLED.key -> "false") {
           spark
             .range(0, numRows, 1, 1)
             .selectExpr(
               s"concat('user-', lpad(cast(id as string), $userPadLen, '0')) as 
userId",
               s"concat('meas-', lpad(cast(id as string), $measurePadLen, '0')) 
as distinctMeasure",
               "cast(id % 4 as string) as dim1",
               "cast(id % 3 as string) as dim2",
               "cast(id % 6 as string) as dim3",
               "cast(id % 5 as string) as dim4",
               "cast(id % 7 as string) as dim5",
               "cast(id % 2 as string) as dim6",
               "cast(id % 8 as string) as dim7",
               "cast(id % 9 as string) as dim8")
             .write
             .mode("overwrite")
             .parquet(path)
         }
         // --- now read back and aggregate via Comet with an unbounded memory 
pool ---
         withSQLConf(
           SQLConf.SHUFFLE_PARTITIONS.key -> "1",
           SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
           SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false",
           CometConf.COMET_ENABLED.key -> "true",
           CometConf.COMET_EXEC_ENABLED.key -> "true",
           CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
           // Critical: disable the only mechanism that resets the 
ByteGroupValueBuilder
           // byte buffer before i32::MAX is reached.
           CometConf.COMET_ONHEAP_MEMORY_POOL_TYPE.key -> "unbounded") {
           val events = spark.read.parquet(path).coalesce(1)
           withTempView("events") {
             events.createOrReplaceTempView("events")
             val dims = Seq("dim1", "dim2", "dim3", "dim4", "dim5", "dim6", 
"dim7", "dim8")
             val cubeCols = dims :+ "userId"
             val groupingFlags = cubeCols.map(c => 
grouping(col(c)).as(s"__g_$c"))
             val df = spark
               .table("events")
               .cube(cubeCols.map(col): _*)
               .agg(count_distinct(col("distinctMeasure")).as("uniq_measure"), 
groupingFlags: _*)
               .filter("__g_userId = 0")
   
             // Guardrail: fail loudly if Comet did NOT take the partial 
aggregate, so a
             // future fallback regression cannot silently turn this test into 
a no-op.
             val plan = df.queryExecution.executedPlan
             val cometAggs = collectWithSubqueries(plan) { case a: 
CometHashAggregateExec => a }
             assert(
               cometAggs.nonEmpty,
               s"Expected at least one CometHashAggregateExec in plan, 
got:\n$plan")
   
             val e = intercept[Throwable] {
               df.collect()
             }
   
             println(e)
   
             def matches(t: Throwable): Boolean = {
               val m = Option(t.getMessage).getOrElse("")
               // ByteGroupValueBuilder<i32> -> exec_datafusion_err! with the 
full wording
               //   "offset overflow, buffer size > 2147483647"   
(multi_group_by/bytes.rs:202)
               // GroupValuesRows fallback -> arrow-row variable.rs:288 panic 
with bare
               //   "offset overflow"
               // Accept either, since dispatch between the two depends on 
schema/agg shape.
               m.contains("offset overflow")
             }
   
             def walk(t: Throwable): List[Throwable] = {
               val causes = Iterator.iterate(t: 
Throwable)(_.getCause).takeWhile(_ != null).toList
               causes ++ causes.flatMap(c => 
Option(c.getSuppressed).toList.flatten)
             }
   
             val chain = walk(e)
             assert(
               chain.exists(matches),
               s"Expected an 'offset overflow' error from DataFusion's 
group-values, " +
                 s"got:\n${chain.map(t => s"  ${t.getClass.getName}: 
${t.getMessage}").mkString("\n")}")
           }
         }
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to