comphead commented on issue #4718:
URL:
https://github.com/apache/datafusion-comet/issues/4718#issuecomment-4815169522
I was able to have local repro
```
test(
"CUBE(9) + COUNT(DISTINCT) wide Utf8 keys with unbounded pool: offset
overflow buffer > i32::MAX") {
import org.apache.spark.sql.functions.{count_distinct, grouping}
val numRows = 30000L
val wideBytes = 384
val userPadLen = wideBytes - "user-".length
val measurePadLen = wideBytes - "meas-".length
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "events").toString
// --- materialize to Parquet (Comet OFF so the writer is plain Spark)
---
// dictionary disabled so Comet's scan emits full-width Utf8 values
into the
// group-key accumulator (dictionary-encoded scan would only push dict
keys).
withSQLConf(
SQLConf.SHUFFLE_PARTITIONS.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
"parquet.enable.dictionary" -> "false",
CometConf.COMET_ENABLED.key -> "false") {
spark
.range(0, numRows, 1, 1)
.selectExpr(
s"concat('user-', lpad(cast(id as string), $userPadLen, '0')) as
userId",
s"concat('meas-', lpad(cast(id as string), $measurePadLen, '0'))
as distinctMeasure",
"cast(id % 4 as string) as dim1",
"cast(id % 3 as string) as dim2",
"cast(id % 6 as string) as dim3",
"cast(id % 5 as string) as dim4",
"cast(id % 7 as string) as dim5",
"cast(id % 2 as string) as dim6",
"cast(id % 8 as string) as dim7",
"cast(id % 9 as string) as dim8")
.write
.mode("overwrite")
.parquet(path)
}
// --- now read back and aggregate via Comet with an unbounded memory
pool ---
withSQLConf(
SQLConf.SHUFFLE_PARTITIONS.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
// Critical: disable the only mechanism that resets the
ByteGroupValueBuilder
// byte buffer before i32::MAX is reached.
CometConf.COMET_ONHEAP_MEMORY_POOL_TYPE.key -> "unbounded") {
val events = spark.read.parquet(path).coalesce(1)
withTempView("events") {
events.createOrReplaceTempView("events")
val dims = Seq("dim1", "dim2", "dim3", "dim4", "dim5", "dim6",
"dim7", "dim8")
val cubeCols = dims :+ "userId"
val groupingFlags = cubeCols.map(c =>
grouping(col(c)).as(s"__g_$c"))
val df = spark
.table("events")
.cube(cubeCols.map(col): _*)
.agg(count_distinct(col("distinctMeasure")).as("uniq_measure"),
groupingFlags: _*)
.filter("__g_userId = 0")
// Guardrail: fail loudly if Comet did NOT take the partial
aggregate, so a
// future fallback regression cannot silently turn this test into
a no-op.
val plan = df.queryExecution.executedPlan
val cometAggs = collectWithSubqueries(plan) { case a:
CometHashAggregateExec => a }
assert(
cometAggs.nonEmpty,
s"Expected at least one CometHashAggregateExec in plan,
got:\n$plan")
val e = intercept[Throwable] {
df.collect()
}
println(e)
def matches(t: Throwable): Boolean = {
val m = Option(t.getMessage).getOrElse("")
// ByteGroupValueBuilder<i32> -> exec_datafusion_err! with the
full wording
// "offset overflow, buffer size > 2147483647"
(multi_group_by/bytes.rs:202)
// GroupValuesRows fallback -> arrow-row variable.rs:288 panic
with bare
// "offset overflow"
// Accept either, since dispatch between the two depends on
schema/agg shape.
m.contains("offset overflow")
}
def walk(t: Throwable): List[Throwable] = {
val causes = Iterator.iterate(t:
Throwable)(_.getCause).takeWhile(_ != null).toList
causes ++ causes.flatMap(c =>
Option(c.getSuppressed).toList.flatten)
}
val chain = walk(e)
assert(
chain.exists(matches),
s"Expected an 'offset overflow' error from DataFusion's
group-values, " +
s"got:\n${chain.map(t => s" ${t.getClass.getName}:
${t.getMessage}").mkString("\n")}")
}
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]