[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17562191#comment-17562191
 ] 

Jeroen van Straten commented on ARROW-16700:
--------------------------------------------

tl;dr it looks like min/max itself is broken and only aggregates [uninitialized 
garbage?] and the last partition it sees; every Consume call casually ignores 
and overrides this->state. Extraordinary claims demand extraordinary evidence, 
especially since I only started looking at Acero in-depth and installed R about 
five hours ago, so here's my complete analysis for posterity...

> For example, how does one resolve the query {{SELECT * FROM measurements 
> WHERE temp == 50}} when all they know for a batch is {{25 < temp < 75}}.

I would assume that in the case of a guarantee like {{25 < temp < 75}} the 
column {{temp}} is not optimized out of the physical schema as it would be for 
{{temp = 50}}, otherwise the data simply isn't there anymore. So, I don't see 
how

> However, no value is ever set on this column and so it ends up getting set 
> implicitly to {{NULL}}.

would apply in this case, and would expect that aggregation would work 
correctly under those circumstances. But I'm not sure how to verify this.

If above assessment is true, it seems to me that the problem is that either:

 * "trivial" guarantees aren't respected consistently throughout Acero, which 
is invalid because columns may already have been optimized out of the physical 
schema under the assumption that all values are known via the guarantee; or
 * if you take the 
[this|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/exec.h#L201]
 description of ExecBatch::guarantee at face value, the problem is that the 
guarantee doesn't actually evaluate to true at all in this case, because the 
columns it asserts a constant value for are in fact optimized out and would 
thus evaluate to null. This conflicts with [the way Scanner deals with 
guarantees|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/dataset/scanner.cc#L911-L914],
 though.

Put differently, different parts of the codebase seem to treat {{guarantee}} in 
two incompatible ways (at least according to the comments).

Looking through the code I don't see _any_ mention of guarantees in nodes other 
than filter and project, so I would expect all nodes to fail on this, _unless_ 
they end up following a projection node, which would materialize the columns 
based on the expressions derived from the guarantee. It looks like the group-by 
example only works because a projection is inserted in that case:

{code}
 5:SinkNode{}
  4:ProjectNode{projection=[some_nulls, min_year]}
    3:ProjectNode{projection=[some_nulls, min_year]}
      2:GroupByNode{keys=["some_nulls"], aggregates=[
        hash_min(min_year, {skip_nulls=false, min_count=0}),
      ]}
        1:ProjectNode{projection=["min_year": year, some_nulls]}
          0:SourceNode{}
{code}

... but something isn't adding up for the failing case, because I'm getting:

{code}
4:SinkNode{}
  3:ProjectNode{projection=[n, min_year, min_month, min_day, max_year, 
max_month, max_day]}
    2:ScalarAggregateNode{aggregates=[
        sum(n, {skip_nulls=true, min_count=1}),
        min(min_year, {skip_nulls=false, min_count=0}),
        min(min_month, {skip_nulls=false, min_count=0}),
        min(min_day, {skip_nulls=false, min_count=0}),
        max(max_year, {skip_nulls=false, min_count=0}),
        max(max_month, {skip_nulls=false, min_count=0}),
        max(max_day, {skip_nulls=false, min_count=0}),
]}
      1:ProjectNode{projection=["n": 1, "min_year": year, "min_month": month, 
"min_day": day, "max_year": year, "max_month": month, "max_day": day]}
        0:SourceNode{}
{code}

In fact, judging by the input and output of 1:ProjectNode in either case, it 
looks like it'd be difficult to make a plan that doesn't need one, since it 
would normally at least get rid of the fragment source information.

And indeed, if I spam sufficient debug output, I see that 1:ProjectNode _is_ 
indeed materializing the columns accordingly, and 2:ScalarAggregateNode _is_ 
actually getting completely-materialized inputs passed to InputReceived. Hmmm...

Some more debug prints and clicking through code later I find that the bad 
minima and maxima are originating from 
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L440-L451].
 Each partition corresponds to its own call. I expected something to be subtly 
wrong here, but it doesn't appear to be subtle at all: local is created 
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L440]
 and is later used to override this->state 
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L445]
 or 
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L450],
 without any reference to the previous value of this->state! I suppose what 
those {{this->state = local}} lines _should_ read is {{this->state \+= local}}, 
since operator+= is overloaded for combining states, so that's what I'll make a 
PR for.

The issue Weston pointed out _may_ still be an issue after that though, just a 
far less likely one to hit (if it's possible at all) because of how likely it 
is that a project node will be placed immediately after a scan node. So I guess 
there should be a followup issue for that?


> [C++] [R] [Datasets] aggregates on partitioning columns
> -------------------------------------------------------
>
>                 Key: ARROW-16700
>                 URL: https://issues.apache.org/jira/browse/ARROW-16700
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++, R
>            Reporter: Jonathan Keane
>            Priority: Blocker
>             Fix For: 9.0.0, 8.0.1
>
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>       n min_year min_month min_day max_year max_month max_day
> #>   <int>    <int>     <int>   <int>    <int>     <int>   <int>
> #> 1 15120     2023         1       1     2023        12      30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>       n min_year min_month min_day max_year max_month max_day
> #> 1 15120     2010         1       1     2023        12      30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>      <int>
> #> 1     2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #>     <int>
> #> 1       1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>        <int>    <int>
> #> 1          0     2010
> #> 2          1     2010
> #> 3          2     2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to