[
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17562191#comment-17562191
]
Jeroen van Straten commented on ARROW-16700:
--------------------------------------------
tl;dr it looks like min/max itself is broken and only aggregates [uninitialized
garbage?] and the last partition it sees; every Consume call casually ignores
and overrides this->state. Extraordinary claims demand extraordinary evidence,
especially since I only started looking at Acero in-depth and installed R about
five hours ago, so here's my complete analysis for posterity...
> For example, how does one resolve the query {{SELECT * FROM measurements
> WHERE temp == 50}} when all they know for a batch is {{25 < temp < 75}}.
I would assume that in the case of a guarantee like {{25 < temp < 75}} the
column {{temp}} is not optimized out of the physical schema as it would be for
{{temp = 50}}, otherwise the data simply isn't there anymore. So, I don't see
how
> However, no value is ever set on this column and so it ends up getting set
> implicitly to {{NULL}}.
would apply in this case, and would expect that aggregation would work
correctly under those circumstances. But I'm not sure how to verify this.
If above assessment is true, it seems to me that the problem is that either:
* "trivial" guarantees aren't respected consistently throughout Acero, which
is invalid because columns may already have been optimized out of the physical
schema under the assumption that all values are known via the guarantee; or
* if you take the
[this|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/exec.h#L201]
description of ExecBatch::guarantee at face value, the problem is that the
guarantee doesn't actually evaluate to true at all in this case, because the
columns it asserts a constant value for are in fact optimized out and would
thus evaluate to null. This conflicts with [the way Scanner deals with
guarantees|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/dataset/scanner.cc#L911-L914],
though.
Put differently, different parts of the codebase seem to treat {{guarantee}} in
two incompatible ways (at least according to the comments).
Looking through the code I don't see _any_ mention of guarantees in nodes other
than filter and project, so I would expect all nodes to fail on this, _unless_
they end up following a projection node, which would materialize the columns
based on the expressions derived from the guarantee. It looks like the group-by
example only works because a projection is inserted in that case:
{code}
5:SinkNode{}
4:ProjectNode{projection=[some_nulls, min_year]}
3:ProjectNode{projection=[some_nulls, min_year]}
2:GroupByNode{keys=["some_nulls"], aggregates=[
hash_min(min_year, {skip_nulls=false, min_count=0}),
]}
1:ProjectNode{projection=["min_year": year, some_nulls]}
0:SourceNode{}
{code}
... but something isn't adding up for the failing case, because I'm getting:
{code}
4:SinkNode{}
3:ProjectNode{projection=[n, min_year, min_month, min_day, max_year,
max_month, max_day]}
2:ScalarAggregateNode{aggregates=[
sum(n, {skip_nulls=true, min_count=1}),
min(min_year, {skip_nulls=false, min_count=0}),
min(min_month, {skip_nulls=false, min_count=0}),
min(min_day, {skip_nulls=false, min_count=0}),
max(max_year, {skip_nulls=false, min_count=0}),
max(max_month, {skip_nulls=false, min_count=0}),
max(max_day, {skip_nulls=false, min_count=0}),
]}
1:ProjectNode{projection=["n": 1, "min_year": year, "min_month": month,
"min_day": day, "max_year": year, "max_month": month, "max_day": day]}
0:SourceNode{}
{code}
In fact, judging by the input and output of 1:ProjectNode in either case, it
looks like it'd be difficult to make a plan that doesn't need one, since it
would normally at least get rid of the fragment source information.
And indeed, if I spam sufficient debug output, I see that 1:ProjectNode _is_
indeed materializing the columns accordingly, and 2:ScalarAggregateNode _is_
actually getting completely-materialized inputs passed to InputReceived. Hmmm...
Some more debug prints and clicking through code later I find that the bad
minima and maxima are originating from
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L440-L451].
Each partition corresponds to its own call. I expected something to be subtly
wrong here, but it doesn't appear to be subtle at all: local is created
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L440]
and is later used to override this->state
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L445]
or
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L450],
without any reference to the previous value of this->state! I suppose what
those {{this->state = local}} lines _should_ read is {{this->state \+= local}},
since operator+= is overloaded for combining states, so that's what I'll make a
PR for.
The issue Weston pointed out _may_ still be an issue after that though, just a
far less likely one to hit (if it's possible at all) because of how likely it
is that a project node will be placed immediately after a scan node. So I guess
there should be a followup issue for that?
> [C++] [R] [Datasets] aggregates on partitioning columns
> -------------------------------------------------------
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
> Issue Type: Bug
> Components: C++, R
> Reporter: Jonathan Keane
> Priority: Blocker
> Fix For: 9.0.0, 8.0.1
>
>
> When summarizing a whole dataset (without group_by) with an aggregate, and
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
> some_nulls = c(0L, 1L, 2L),
> year = 2010:2023,
> month = 1:12,
> day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
> summarise(n = n(), min_year = min(year), min_month = min(month), min_day =
> min(day), max_year = max(year), max_month = max(month), max_day = max(day))
> %>%
> collect()
> #> # A tibble: 1 × 7
> #> n min_year min_month min_day max_year max_month max_day
> #> <int> <int> <int> <int> <int> <int> <int>
> #> 1 15120 2023 1 1 2023 12 30
> # comapred to what we get with dplyr
> df %>%
> summarise(n = n(), min_year = min(year), min_month = min(month), min_day =
> min(day), max_year = max(year), max_month = max(month), max_day = max(day))
> %>%
> collect()
> #> n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1 1 2023 12 30
> # even min alone is off:
> ds %>%
> summarise(min_year = min(year)) %>%
> collect()
> #> # A tibble: 1 × 1
> #> min_year
> #> <int>
> #> 1 2016
>
> # but non-partitioning columns are fine:
> ds %>%
> summarise(min_day = min(day)) %>%
> collect()
> #> # A tibble: 1 × 1
> #> min_day
> #> <int>
> #> 1 1
>
>
> # But with a group_by, this seems ok
> ds %>%
> group_by(some_nulls) %>%
> summarise(min_year = min(year)) %>%
> collect()
> #> # A tibble: 3 × 2
> #> some_nulls min_year
> #> <int> <int>
> #> 1 0 2010
> #> 2 1 2010
> #> 3 2 2010
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)