This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-site.git
The following commit(s) were added to refs/heads/main by this push:
new 9b0e78ac9f6 [Website] Aggregating Millions of Groups Fast in Apache
Arrow DataFusion 28.0.0 (#386)
9b0e78ac9f6 is described below
commit 9b0e78ac9f6bd1ef43abbdcc796fe3374f8b1f2f
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Aug 14 06:36:13 2023 -0400
[Website] Aggregating Millions of Groups Fast in Apache Arrow DataFusion
28.0.0 (#386)
Closes https://github.com/apache/arrow-datafusion/issues/6988
**Note**: This describes work @tustvold @Dandandan and I did in
DataFusion 28.0.0. This content was originally published on the
[InfluxData
Blog](https://www.influxdata.com/blog/aggregating-millions-groups-fast-apache-arrow-datafusion/)
but since it is general applicable to Apache Arrow DataFusion I would
like to syndicate it here becase:
1. This is a form where the community can comment / keep it up to date
via PR
2. It is hosted on a platform with a different lifetime than a company
blog
This is the same model we followed with
https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
which was also republished on the arrow blog after the InfluxData blog
It also gives me an example to use my original ASCII art diagrams :)
---
_posts/2023-08-05-datafusion_fast_grouping.md | 405 ++++++++++++++++++++++++++
assets/datafusion_fast_grouping/full.png | Bin 0 -> 50146 bytes
assets/datafusion_fast_grouping/summary.png | Bin 0 -> 52822 bytes
3 files changed, 405 insertions(+)
diff --git a/_posts/2023-08-05-datafusion_fast_grouping.md
b/_posts/2023-08-05-datafusion_fast_grouping.md
new file mode 100644
index 00000000000..f8b5a50e5ef
--- /dev/null
+++ b/_posts/2023-08-05-datafusion_fast_grouping.md
@@ -0,0 +1,405 @@
+---
+layout: post
+title: "Aggregating Millions of Groups Fast in Apache Arrow DataFusion 28.0.0"
+date: "2023-08-05 00:00:00"
+author: alamb, Dandandan, tustvold
+categories: [release]
+---
+
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+<!--- Converted from Google Docs using
https://www.buymeacoffee.com/docstomarkdown --->
+
+## Aggregating Millions of Groups Fast in Apache Arrow DataFusion
+
+Andrew Lamb, Daniël Heres, Raphael Taylor-Davies,
+
+*Note: this article was originally published on the [InfluxData
Blog](https://www.influxdata.com/blog/aggregating-millions-groups-fast-apache-arrow-datafusion)*
+
+
+## TLDR
+
+Grouped aggregations are a core part of any analytic tool, creating
understandable summaries of huge data volumes. [Apache Arrow
DataFusion](https://arrow.apache.org/datafusion/)’s parallel aggregation
capability is 2-3x faster in the [newly released version
`28.0.0`](https://crates.io/crates/datafusion/28.0.0) for queries with a large
number (10,000 or more) of groups.
+
+Improving aggregation performance matters to all users of DataFusion. For
example, both InfluxDB, a [time series data
platform](https://github.com/influxdata/influxdb) and Coralogix, a [full-stack
observability](https://coralogix.com/?utm_source=InfluxDB&utm_medium=Blog&utm_campaign=organic)
platform, aggregate vast amounts of raw data to monitor and create insights
for our customers. Improving DataFusion’s performance lets us provide better
user experiences by generating insights faster [...]
+
+With the new optimizations, DataFusion’s grouping speed is now close to
DuckDB, a system that regularly reports
[great](https://duckdblabs.github.io/db-benchmark/)
[grouping](https://duckdb.org/2022/03/07/aggregate-hashtable.html#experiments)
benchmark performance numbers. Figure 1 contains a representative sample of
[ClickBench](https://github.com/ClickHouse/ClickBench/tree/main) on a single
Parquet file, and the full results are at the end of this article.
+
+<img src="{{ site.baseurl }}/assets/datafusion_fast_grouping/summary.png"
width="700">
+
+**Figure 1**: Query performance for ClickBench queries on queries 16, 17, 18
and 19 on a single Parquet file for DataFusion `27.0.0`, DataFusion `28.0.0`
and DuckDB `0.8.1`.
+
+
+## Introduction to high cardinality grouping
+
+Aggregation is a fancy word for computing summary statistics across many rows
that have the same value in one or more columns. We call the rows with the same
values _groups_ and “high cardinality” means there are a large number of
distinct groups in the dataset. At the time of writing, a “large” number of
groups in analytic engines is around 10,000.
+
+For example the [ClickBench](https://github.com/ClickHouse/ClickBench) _hits_
dataset contains 100 million anonymized user clicks across a set of websites.
ClickBench Query 17 is:
+
+
+```sql
+SELECT "UserID", "SearchPhrase", COUNT(*)
+FROM hits
+GROUP BY "UserID", "SearchPhrase"
+ORDER BY COUNT(*)
+DESC LIMIT 10;
+```
+
+In English, this query finds “the top ten (user, search phrase) combinations,
across all clicks” and produces the following results (there are no search
phrases for the top ten users):
+
+
+```text
++---------------------+--------------+-----------------+
+| UserID | SearchPhrase | COUNT(UInt8(1)) |
++---------------------+--------------+-----------------+
+| 1313338681122956954 | | 29097 |
+| 1907779576417363396 | | 25333 |
+| 2305303682471783379 | | 10597 |
+| 7982623143712728547 | | 6669 |
+| 7280399273658728997 | | 6408 |
+| 1090981537032625727 | | 6196 |
+| 5730251990344211405 | | 6019 |
+| 6018350421959114808 | | 5990 |
+| 835157184735512989 | | 5209 |
+| 770542365400669095 | | 4906 |
++---------------------+--------------+-----------------+
+```
+
+The ClickBench dataset contains
+
+* 99,997,497 total rows[^1]
+* 17,630,976 different users (distinct UserIDs)[^2]
+* 6,019,103 different search phrases[^3]
+* 24,070,560 distinct combinations[^4] of (UserID, SearchPhrase)
+Thus, to answer the query, DataFusion must map each of the 100M different
input rows into one of the **24 million different groups**, and keep count of
how many such rows there are in each group.
+
+
+## The solution
+
+Like most concepts in databases and other analytic systems, the basic ideas of
this algorithm are straightforward and taught in introductory computer science
courses. You could compute the query with a program such as this[^5]:
+
+```python
+import pandas as pd
+from collections import defaultdict
+from operator import itemgetter
+
+# read file
+hits = pd.read_parquet('hits.parquet', engine='pyarrow')
+
+# build groups
+counts = defaultdict(int)
+for index, row in hits.iterrows():
+ group = (row['UserID'], row['SearchPhrase']);
+ # update the dict entry for the corresponding key
+ counts[group] += 1
+
+# Print the top 10 values
+print (dict(sorted(counts.items(), key=itemgetter(1), reverse=True)[:10]))
+```
+
+This approach, while simple, is both slow and very memory inefficient. It
requires over 40 seconds to compute the results for less than 1% of the
dataset[^6]. Both DataFusion `28.0.0` and DuckDB `0.8.1` compute results in
under 10 seconds for the _entire_ dataset.
+
+To answer this query quickly and efficiently, you have to write your code such
that it:
+
+
+1. Keeps all cores busy aggregating via parallelized computation
+2. Updates aggregate values quickly, using vectorizable loops that are easy
for compilers to translate into the high performance
[SIMD](https://en.wikipedia.org/wiki/Single_instruction,_multiple_data)
instructions available in modern CPUs.
+
+The rest of this article explains how grouping works in DataFusion and the
improvements we made in `28.0.0`.
+
+
+### Two phase parallel partitioned grouping
+
+Both DataFusion `27.0.` and `28.0.0` use state-of-the-art, two phase parallel
hash partitioned grouping, similar to other high-performance vectorized engines
like [DuckDB’s Parallel Grouped
Aggregates](https://duckdb.org/2022/03/07/aggregate-hashtable.html). In
pictures this looks like:
+
+
+```text
+ ▲ ▲
+ │ │
+ │ │
+ │ │
+┌───────────────────────┐ ┌───────────────────┐
+│ GroupBy │ │ GroupBy │ Step 4
+│ (Final) │ │ (Final) │
+└───────────────────────┘ └───────────────────┘
+ ▲ ▲
+ │ │
+ └────────────┬───────────┘
+ │
+ │
+ ┌─────────────────────────┐
+ │ Repartition │ Step 3
+ │ HASH(x) │
+ └─────────────────────────┘
+ ▲
+ │
+ ┌────────────┴──────────┐
+ │ │
+ │ │
+ ┌────────────────────┐ ┌─────────────────────┐
+ │ GroupyBy │ │ GroupBy │ Step 2
+ │ (Partial) │ │ (Partial) │
+ └────────────────────┘ └─────────────────────┘
+ ▲ ▲
+ ┌──┘ └─┐
+ │ │
+ .─────────. .─────────.
+ ,─' '─. ,─' '─.
+; Input : ; Input : Step 1
+: Stream 1 ; : Stream 2 ;
+ ╲ ╱ ╲ ╱
+ '─. ,─' '─. ,─'
+ `───────' `───────'
+```
+
+**Figure 2**: Two phase repartitioned grouping: data flows from bottom
(source) to top (results) in two phases. First (Steps 1 and 2), each core reads
the data into a core-specific hash table, computing intermediate aggregates
without any cross-core coordination. Then (Steps 3 and 4) DataFusion divides
the data (“repartitions”) into distinct subsets by group value, and each subset
is sent to a specific core which computes the final aggregate.
+
+The two phases are critical for keeping cores busy in a multi-core system.
Both phases use the same hash table approach (explained in the next section),
but differ in how the groups are distributed and the partial results emitted
from the accumulators. The first phase aggregates data as soon as possible
after it is produced. However, as shown in Figure 2, the groups can be anywhere
in any input, so the same group is often found on many different cores. The
second phase uses a hash functi [...]
+
+```
+ ┌─────┐ ┌─────┐
+ │ 1 │ │ 3 │
+ │ 2 │ │ 4 │ 2. After Repartitioning: each
+ └─────┘ └─────┘ group key appears in exactly
+ ┌─────┐ ┌─────┐ one partition
+ │ 1 │ │ 3 │
+ │ 2 │ │ 4 │
+ └─────┘ └─────┘
+
+─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
+
+ ┌─────┐ ┌─────┐
+ │ 2 │ │ 2 │
+ │ 1 │ │ 2 │
+ │ 3 │ │ 3 │
+ │ 4 │ │ 1 │
+ └─────┘ └─────┘ 1. Input Stream: groups
+ ... ... values are spread
+ ┌─────┐ ┌─────┐ arbitrarily over each input
+ │ 1 │ │ 4 │
+ │ 4 │ │ 3 │
+ │ 1 │ │ 1 │
+ │ 4 │ │ 3 │
+ │ 3 │ │ 2 │
+ │ 2 │ │ 2 │
+ │ 2 │ └─────┘
+ └─────┘
+
+ Core A Core B
+
+```
+
+**Figure 3**: Group value distribution across 2 cores during aggregation
phases. In the first phase, every group value `1`, `2`, `3`, `4`, is present in
the input stream processed by each core. In the second phase, after
repartitioning, the group values `1` and `2` are processed by core A, and
values `3` and `4` are processed only by core B.
+
+There are some additional subtleties in the [DataFusion
implementation](https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/physical_plan/aggregates/row_hash.rs)
not mentioned above due to space constraints, such as:
+
+1. The policy of when to emit data from the first phase’s hash table (e.g.
because the data is partially sorted)
+2. Handling specific filters per aggregate (due to the `FILTER` SQL clause)
+3. Data types of intermediate values (which may not be the same as the final
output for some aggregates such as `AVG`).
+4. Action taken when memory use exceeds its budget.
+
+
+### Hash grouping
+
+DataFusion queries can compute many different aggregate functions for each
group, both [built
in](https://arrow.apache.org/datafusion/user-guide/sql/aggregate_functions.html)
and/or user defined
[`AggregateUDFs`](https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.AggregateUDF.html).
The state for each aggregate function, called an <em>accumulator</em>, is
tracked with a hash table (DataFusion uses the excellent
[HashBrown](https://docs.rs/hashbrown/latest/hashbrown/index.ht [...]
+
+
+### Hash grouping in `27.0.0`
+
+As shown in Figure 3, DataFusion `27.0.0` stores the data in a
[`GroupState`](https://github.com/apache/arrow-datafusion/blob/4d93b6a3802151865b68967bdc4c7d7ef425b49a/datafusion/core/src/physical_plan/aggregates/utils.rs#L38-L50)
structure which, unsurprisingly, tracks the state for each group. The state
for each group consists of:
+
+1. The actual value of the group columns, in [Arrow
Row](https://docs.rs/arrow-row/latest/arrow_row/index.html) format.
+2. In-progress accumulations (e.g. the running counts for the `COUNT`
aggregate) for each group, in one of two possible formats
([`Accumulator`](https://github.com/apache/arrow-datafusion/blob/a6dcd943051a083693c352c6b4279156548490a0/datafusion/expr/src/accumulator.rs#L24-L49)
or
[`RowAccumulator`](https://github.com/apache/arrow-datafusion/blob/a6dcd943051a083693c352c6b4279156548490a0/datafusion/physical-expr/src/aggregate/row_accumulator.rs#L26-L46)).
+3. Scratch space for tracking which rows match each aggregate in each batch.
+
+
+
+```
+ ┌──────────────────────────────────────┐
+ │ │
+ │ ... │
+ │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
+ │ ┃ ┃ │
+ ┌─────────┐ │ ┃ ┌──────────────────────────────┐ ┃ │
+ │ │ │ ┃ │group values: OwnedRow │ ┃ │
+ │ ┌─────┐ │ │ ┃ └──────────────────────────────┘ ┃ │
+ │ │ 5 │ │ │ ┃ ┌──────────────────────────────┐ ┃ │
+ │ ├─────┤ │ │ ┃ │Row accumulator: │ ┃ │
+ │ │ 9 │─┼────┐ │ ┃ │Vec<u8> │ ┃ │
+ │ ├─────┤ │ │ │ ┃ └──────────────────────────────┘ ┃ │
+ │ │ ... │ │ │ │ ┃ ┌──────────────────────┐ ┃ │
+ │ ├─────┤ │ │ │ ┃ │┌──────────────┐ │ ┃ │
+ │ │ 1 │ │ │ │ ┃ ││Accumulator 1 │ │ ┃ │
+ │ ├─────┤ │ │ │ ┃ │└──────────────┘ │ ┃ │
+ │ │ ... │ │ │ │ ┃ │┌──────────────┐ │ ┃ │
+ │ └─────┘ │ │ │ ┃ ││Accumulator 2 │ │ ┃ │
+ │ │ │ │ ┃ │└──────────────┘ │ ┃ │
+ └─────────┘ │ │ ┃ │ Box<dyn Accumulator> │ ┃ │
+ Hash Table │ │ ┃ └──────────────────────┘ ┃ │
+ │ │ ┃ ┌─────────────────────────┐ ┃ │
+ │ │ ┃ │scratch indices: Vec<u32>│ ┃ │
+ │ │ ┃ └─────────────────────────┘ ┃ │
+ │ │ ┃ GroupState ┃ │
+ └─────▶ │ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │
+ │ │
+ Hash table tracks an │ ... │
+ index into group_states │ │
+ └──────────────────────────────────────┘
+ group_states: Vec<GroupState>
+
+ There is one GroupState PER GROUP
+
+```
+
+
+**Figure 4**: Hash group operator structure in DataFusion `27.0.0`. A hash
table maps each group to a GroupState which contains all the per-group states.
+
+To compute the aggregate, DataFusion performs the following steps for each
input batch:
+
+
+
+1. Calculate hash using [efficient vectorized
code](https://github.com/apache/arrow-datafusion/blob/a6dcd943051a083693c352c6b4279156548490a0/datafusion/physical-expr/src/hash_utils.rs#L264-L307),
specialized for each data type.
+2. Determine group indexes for each input row using the hash table (creating
new entries for newly seen groups).
+3. [Update Accumulators for each group that had input
rows,](https://github.com/apache/arrow-datafusion/blob/4ab8be57dee3bfa72dd105fbd7b8901b873a4878/datafusion/core/src/physical_plan/aggregates/row_hash.rs#L562-L602)
assembling the rows into a contiguous range for vectorized accumulator if
there are a sufficient number of them.
+
+DataFusion also stores the hash values in the table to avoid potentially
costly hash recomputation when resizing the hash table.
+
+This scheme works very well for a relatively small number of distinct groups:
all accumulators are efficiently updated with large contiguous batches of rows.
+
+However, this scheme is not ideal for high cardinality grouping due to:
+
+1. **Multiple allocations per group** for the group value row format, as well
as for the `RowAccumulator`s and each `Accumulator`. The `Accumulator` may
have additional allocations within it as well.
+2. **Non-vectorized updates:** Accumulator updates often fall back to a slower
non-vectorized form because the number of distinct groups is large (and thus
number of values per group is small) in each input batch.
+
+
+### Hash grouping in `28.0.0`
+
+For `28.0.0`, we rewrote the core group by implementation following
traditional system optimization principles: fewer allocations, type
specialization, and aggressive vectorization.
+
+DataFusion `28.0.0` uses the same RawTable and still stores group indexes. The
major differences, as shown in Figure 4, are:
+
+
+1. Group values are stored either
+ 1. Inline in the `RawTable` (for single columns of primitive types), where
the conversion to Row format costs more than its benefit
+ 2. In a separate
[Rows](https://docs.rs/arrow-row/latest/arrow_row/struct.Row.html) structure
with a single contiguous allocation for all groups values, rather than an
allocation per group. Accumulators manage the state for all the groups
internally, so the code to update intermediate values is a tight type
specialized loop. The new
[`GroupsAccumulator`](https://github.com/apache/arrow-datafusion/blob/a6dcd943051a083693c352c6b4279156548490a0/datafusion/physical-expr/src/aggregate/gro
[...]
+
+
+```
+┌───────────────────────────────────┐ ┌───────────────────────┐
+│ ┌ ─ ─ ─ ─ ─ ┐ ┌─────────────────┐│ │ ┏━━━━━━━━━━━━━━━━━━━┓ │
+│ │ ││ │ ┃ ┌──────────────┐ ┃ │
+│ │ │ │ ┌ ─ ─ ┐┌─────┐ ││ │ ┃ │┌───────────┐ │ ┃ │
+│ │ X │ 5 │ ││ │ ┃ ││ value1 │ │ ┃ │
+│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │└───────────┘ │ ┃ │
+│ │ Q │ 9 │──┼┼──┐ │ ┃ │ ... │ ┃ │
+│ │ │ │ ├ ─ ─ ┤├─────┤ ││ └──┼─╋─▶│ │ ┃ │
+│ │ ... │ ... │ ││ │ ┃ │┌───────────┐ │ ┃ │
+│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ ││ valueN │ │ ┃ │
+│ │ H │ 1 │ ││ │ ┃ │└───────────┘ │ ┃ │
+│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │values: Vec<T>│ ┃ │
+│ Rows │ ... │ ... │ ││ │ ┃ └──────────────┘ ┃ │
+│ │ │ │ └ ─ ─ ┘└─────┘ ││ │ ┃ ┃ │
+│ ─ ─ ─ ─ ─ ─ │ ││ │ ┃ GroupsAccumulator ┃ │
+│ └─────────────────┘│ │ ┗━━━━━━━━━━━━━━━━━━━┛ │
+│ Hash Table │ │ │
+│ │ │ ... │
+└───────────────────────────────────┘ └───────────────────────┘
+ GroupState Accumulators
+
+
+Hash table value stores group_indexes One GroupsAccumulator
+and group values. per aggregate. Each
+ stores the state for
+Group values are stored either inline *ALL* groups, typically
+in the hash table or in a single using a native Vec<T>
+allocation using the arrow Row format
+```
+
+**Figure 5**: Hash group operator structure in DataFusion `28.0.0`. Group
values are stored either directly in the hash table, or in a single allocation
using the arrow Row format. The hash table contains group indexes. A single
`GroupsAccumulator` stores the per-aggregate state for _all_ groups.
+
+This new structure improves performance significantly for high cardinality
groups due to:
+
+
+
+1. **Reduced allocations**: There are no longer any individual allocations per
group.
+2. **Contiguous native accumulator states**: Type-specialized accumulators
store the values for all groups in a single contiguous allocation using a [Rust
Vec<T>](https://doc.rust-lang.org/std/vec/struct.Vec.html) of some native
type.
+3. **Vectorized state update**: The inner aggregate update loops, which are
type-specialized and in terms of native `Vec`s, are well-vectorized by the Rust
compiler (thanks [LLVM](https://llvm.org/)!).
+
+
+### Notes
+
+Some vectorized grouping implementations store the accumulator state row-wise
directly in the hash table, which often uses modern CPU caches efficiently.
Managing accumulator state in columnar fashion may sacrifice some cache
locality, however it ensures the size of the hash table remains small, even
when there are large numbers of groups and aggregates, making it easier for the
compiler to vectorize the accumulator update.
+
+Depending on the cost of recomputing hash values, DataFusion `28.0.0` may or
may not store the hash values in the table. This optimizes the tradeoff between
the cost of computing the hash value (which is expensive for strings, for
example) vs. the cost of storing it in the hash table.
+
+One subtlety that arises from pushing state updates into GroupsAccumulators is
that each accumulator must handle similar variations with/without filtering and
with/without nulls in the input. DataFusion `28.0.0` uses a templated
[`NullState`](https://github.com/apache/arrow-datafusion/blob/a6dcd943051a083693c352c6b4279156548490a0/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs#L28-L54)
which encapsulates these common patterns across accumulators.
+
+The code structure is heavily influenced by the fact DataFusion is implemented
using [Rust](https://www.rust-lang.org/), a new(ish) systems programming
language focused on speed and safety. Rust heavily discourages many of the
traditional pointer casting “tricks” used in C/C++ hash grouping
implementations. The DataFusion aggregation code is almost entirely
[`safe`](https://doc.rust-lang.org/nomicon/meet-safe-and-unsafe.html#:~:text=Safe%20Rust%20is%20the%20true,Undefined%20Behavior%20(a
[...]
+
+
+## ClickBench results
+
+The full results of running the
[ClickBench](https://github.com/ClickHouse/ClickBench/tree/main) queries
against the single Parquet file with DataFusion `27.0.0`, DataFusion `28.0.0`,
and DuckDB `0.8.1` are below. These numbers were run on a GCP `e2-standard-8
machine` with 8 cores and 32 GB of RAM, using the scripts
[here](https://github.com/alamb/datafusion-duckdb-benchmark).
+
+As the industry moves towards data systems assembled from components, it is
increasingly important that they exchange data using open standards such as
[Apache Arrow](https://arrow.apache.org/) and
[Parquet](https://parquet.apache.org/) rather than custom storage and in-memory
formats. Thus, this benchmark uses a single input Parquet file representative
of many DataFusion users and aligned with the current trend in analytics of
avoiding a costly load/transformation into a custom storage [...]
+
+DataFusion now reaches near-DuckDB-speeds querying Parquet data. While we
don’t plan to engage in a benchmarking shootout with a team that literally
wrote [Fair Benchmarking Considered
Difficult](https://dl.acm.org/doi/abs/10.1145/3209950.3209955), hopefully
everyone can agree that DataFusion `28.0.0` is a significant improvement.
+
+<img src="{{ site.baseurl }}/assets/datafusion_fast_grouping/full.png"
width="700">
+
+**Figure 6**: Performance of DataFusion `27.0.0`, DataFusion `28.0.0`, and
DuckDB `0.8.1` on all 43 ClickBench queries against a single `hits.parquet`
file. Lower is better.
+
+
+### Notes
+
+DataFusion `27.0.0` was not able to run several queries due to either planner
bugs (Q9, Q11, Q12, 14) or running out of memory (Q33). DataFusion `28.0.0`
solves those issues.
+
+DataFusion is faster than DuckDB for query 21 and 22, likely due to optimized
implementations of string pattern matching.
+
+
+## Conclusion: performance matters
+
+Improving aggregation performance by more than a factor of two allows
developers building products and projects with DataFusion to spend more time on
value-added domain specific features. We believe building systems with
DataFusion is much faster than trying to build something similar from scratch.
DataFusion increases productivity because it eliminates the need to rebuild
well-understood, but costly to implement, analytic database technology. While
we’re pleased with the improvements in [...]
+
+
+## Acknowledgments
+
+DataFusion is a [community
effort](https://arrow.apache.org/datafusion/contributor-guide/communication.html)
and this work was not possible without contributions from many in the
community. A special shout out to [sunchao](https://github.com/sunchao),
[yjshen](https://github.com/jyshen),
[yahoNanJing](https://github.com/yahoNanJing),
[mingmwang](https://github.com/mingmwang),
[ozankabak](https://github.com/ozankabak),
[mustafasrepo](https://github.com/mustafasrepo), and everyone else who [...]
+
+
+## About DataFusion
+
+[Apache Arrow DataFusion](https://arrow.apache.org/datafusion/) is an
extensible query engine and database toolkit, written in
[Rust](https://www.rust-lang.org/), that uses [Apache
Arrow](https://arrow.apache.org/) as its in-memory format. DataFusion, along
with [Apache Calcite](https://calcite.apache.org/), Facebook’s
[Velox](https://github.com/facebookincubator/velox), and similar technology are
part of the next generation “[Deconstructed
Database](https://www.usenix.org/publications/l [...]
+
+
+<!-- Footnotes themselves at the bottom. -->
+## Notes
+
+[^1]: `SELECT COUNT(*) FROM 'hits.parquet';`
+
+[^2]: `SELECT COUNT(DISTINCT "UserID") as num_users FROM 'hits.parquet';`
+
+[^3]: `SELECT COUNT(DISTINCT "SearchPhrase") as num_phrases FROM
'hits.parquet';`
+
+[^4]: `SELECT COUNT(*) FROM (SELECT DISTINCT "UserID", "SearchPhrase" FROM
'hits.parquet')`
+
+[^5]: Full script at
[hash.py](https://github.com/alamb/datafusion-duckdb-benchmark/blob/main/hash.py)
+
+[^6]:
[hits_0.parquet](https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_%7B%7D.parquet),
one of the files from the partitioned ClickBench dataset, which has `100,000`
rows and is 117 MB in size. The entire dataset has `100,000,000` rows in a
single 14 GB Parquet file. The script did not complete on the entire dataset
after 40 minutes, and used 212 GB RAM at peak.
diff --git a/assets/datafusion_fast_grouping/full.png
b/assets/datafusion_fast_grouping/full.png
new file mode 100644
index 00000000000..9f07b04bc65
Binary files /dev/null and b/assets/datafusion_fast_grouping/full.png differ
diff --git a/assets/datafusion_fast_grouping/summary.png
b/assets/datafusion_fast_grouping/summary.png
new file mode 100644
index 00000000000..f92df82d20e
Binary files /dev/null and b/assets/datafusion_fast_grouping/summary.png differ