[
https://issues.apache.org/jira/browse/SPARK-55348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bruno Messias updated SPARK-55348:
----------------------------------
Description:
This proposal introduces a *family of stateful expressions* (`async_track_min`,
`async_track_max`, `async_track_...`) that prevent exponential row explosion in
recursive CTEs for graph algorithms for example. These expressions filter
redundant rows during iteration by tracking the "best" value seen for each key.
{*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc, the
result is the same regardless of processing order. Spark's distributed task is
actually an advantage here: zero coordination overhead, with correctness
guaranteed by the mathematical properties of these operators.
{*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all
paths (exponential in graph density). With `async_track`, each key is accepted
at most once per iteration, reducing worst-case complexity
h3. Motivation: Need for Efficient Recursive CTEs
The introduction of recursive CTEs in Spark was a significant step forward. I
work in financial services where we are usubg recursive CTEs
However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a
certain density or depth, queries become impractical due to exponential
intermediate row growth. This forces teams to either:
- Limit query scope to small subgraphs
- Accept long runtimes and high compute costs
- Move data to external graph systems (breaking the SQL-native workflow) and
going outside from databrick
{*}impact{*}:
* Data analysts can write graph queries without learning new frameworks
* No data movement to external graph databases
* Cost reduction and energy efficieny
I believe Databricks alone must be executing millions of SQL queries per hour
across its warehouses. Even a small percentage of those hitting row explosion
in recursive CTEs represents significant wasted compute and cost
*The Problem today:*
Today, to implement SSSP in pure SQL you must generate all candidate paths and
{*}apply {{MIN(dist)}} per node at the end{*}.
Maybe I'm missing something obvious, but I haven't found a pure SQL solution
that avoids this explosion. If one exists, I'd love to learn about it.
*Proposed Solution: Key-Value Memoization with Monotonic Operators*
Introduce expressions that implement key-value memoization with provable
convergence for monotonic operators.
{code:java}
async_track_min(key ANY, value NUMERIC) → BOOLEAN -- keep row if value <
stored min or key new
async_track_max(key ANY, value NUMERIC) → BOOLEAN -- keep row if value >
stored max or key new
async_track_first(key ANY) → BOOLEAN -- keep row only the first
time the key is seen
{code}
KEY identifies the entity (e.g., graph node)
VALUE is the metric to optimize (e.g., distance) OPERATOR determines
convergence behavior (min, max, first, blend, ...)
*Proof of Concept with REDIS and UDF*
I implemented a proof-of-concept using Redis as an external state store to
validate the feasibility of this approach and demonstrate that row explosion in
recursive CTEs can be effectively avoided.
* *Repository:*
[https://github.com/devmessias/spark/tree/poc_async_track_op_expr]
* *Approach:* Redis-backed key-value state with Lua scripts for atomic
check-and-update
* *Goal:* Validate correctness and asymptotic behavior, not provide a
production-ready solution
The PoC intentionally relied on an external system to keep the implementation
minimal and focused on semantics. Despite the per-row network overhead, it
successfully demonstrated that key-based, monotonic filtering during iteration
prevents exponential row growth
*Working proposal*
In the diagram bellow, the approach I implemented and tested is the global
option: an AsyncTrackCoordinator on the Driver that holds a
ConcurrentHashMap<Key, Value> and exposes an atomic compute()-style
check-and-update/. It’s extremely simple to implement and reason abou. The
trade-offs are straightforward per-update RPC latency and Driver memory/heap
limits but it was the fastest way to validate semantics and prove we can
eliminate row explosion.
!image-2026-02-03-22-25-23-874.png|width=663,height=442!
I also considered a local RocksDB-based backend, but did not investigate it
deeply. At this stage, the focus is on validating the proposal and its
semantics with the community; backend optimizations can be explored later if
there is agreement on the feature.
*References*
- [https://github.com/lsmgeb89/asynch_bellman_ford]
- [https://arxiv.org/abs/1204.6078]
-
https://www.repositorio.ufal.br/bitstream/123456789/10217/1/Modelagem%20ass%C3%ADncrona%20do%20Page%20Rank.pdf
was:
This proposal introduces a *family of stateful expressions* (`async_track_min`,
`async_track_max`, `async_track_...`) that prevent exponential row explosion in
recursive CTEs for graph algorithms for example. These expressions filter
redundant rows during iteration by tracking the "best" value seen for each key.
{*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc, the
result is the same regardless of processing order. Spark's distributed task is
actually an advantage here: zero coordination overhead, with correctness
guaranteed by the mathematical properties of these operators.
{*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all
paths (exponential in graph density). With `async_track`, each key is accepted
at most once per iteration, reducing worst-case complexity
h3. Motivation: Need for Efficient Recursive CTEs
The introduction of recursive CTEs in Spark was a significant step forward. I
work in financial services where we are usubg recursive CTEs
However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a
certain density or depth, queries become impractical due to exponential
intermediate row growth. This forces teams to either:
* Limit query scope to small subgraphs
* Accept long runtimes and high compute costs
* Move data to external graph systems (breaking the SQL-native workflow) and
going outside from databrick
Remaining Estimate: 336h
Original Estimate: 336h
> async_track expressions for efficient convergent algorithms in recursive CTEs
> (avoids exponential blowup)
> ---------------------------------------------------------------------------------------------------------
>
> Key: SPARK-55348
> URL: https://issues.apache.org/jira/browse/SPARK-55348
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.1.1
> Reporter: Bruno Messias
> Priority: Major
> Labels: CTE, performance, recursive
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> This proposal introduces a *family of stateful expressions*
> (`async_track_min`, `async_track_max`, `async_track_...`) that prevent
> exponential row explosion in recursive CTEs for graph algorithms for example.
> These expressions filter redundant rows during iteration by tracking the
> "best" value seen for each key.
> {*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc,
> the result is the same regardless of processing order. Spark's distributed
> task is actually an advantage here: zero coordination overhead, with
> correctness guaranteed by the mathematical properties of these operators.
> {*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all
> paths (exponential in graph density). With `async_track`, each key is
> accepted at most once per iteration, reducing worst-case complexity
> h3. Motivation: Need for Efficient Recursive CTEs
> The introduction of recursive CTEs in Spark was a significant step forward.
> I work in financial services where we are usubg recursive CTEs
> However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a
> certain density or depth, queries become impractical due to exponential
> intermediate row growth. This forces teams to either:
> - Limit query scope to small subgraphs
> - Accept long runtimes and high compute costs
> - Move data to external graph systems (breaking the SQL-native workflow) and
> going outside from databrick
> {*}impact{*}:
> * Data analysts can write graph queries without learning new frameworks
> * No data movement to external graph databases
> * Cost reduction and energy efficieny
> I believe Databricks alone must be executing millions of SQL queries per hour
> across its warehouses. Even a small percentage of those hitting row explosion
> in recursive CTEs represents significant wasted compute and cost
> *The Problem today:*
> Today, to implement SSSP in pure SQL you must generate all candidate paths
> and {*}apply {{MIN(dist)}} per node at the end{*}.
> Maybe I'm missing something obvious, but I haven't found a pure SQL solution
> that avoids this explosion. If one exists, I'd love to learn about it.
> *Proposed Solution: Key-Value Memoization with Monotonic Operators*
> Introduce expressions that implement key-value memoization with provable
> convergence for monotonic operators.
> {code:java}
> async_track_min(key ANY, value NUMERIC) → BOOLEAN -- keep row if value <
> stored min or key new
> async_track_max(key ANY, value NUMERIC) → BOOLEAN -- keep row if value >
> stored max or key new
> async_track_first(key ANY) → BOOLEAN -- keep row only the
> first time the key is seen
> {code}
> KEY identifies the entity (e.g., graph node)
> VALUE is the metric to optimize (e.g., distance) OPERATOR determines
> convergence behavior (min, max, first, blend, ...)
>
> *Proof of Concept with REDIS and UDF*
> I implemented a proof-of-concept using Redis as an external state store to
> validate the feasibility of this approach and demonstrate that row explosion
> in recursive CTEs can be effectively avoided.
> * *Repository:*
> [https://github.com/devmessias/spark/tree/poc_async_track_op_expr]
> * *Approach:* Redis-backed key-value state with Lua scripts for atomic
> check-and-update
> * *Goal:* Validate correctness and asymptotic behavior, not provide a
> production-ready solution
> The PoC intentionally relied on an external system to keep the implementation
> minimal and focused on semantics. Despite the per-row network overhead, it
> successfully demonstrated that key-based, monotonic filtering during
> iteration prevents exponential row growth
> *Working proposal*
> In the diagram bellow, the approach I implemented and tested is the global
> option: an AsyncTrackCoordinator on the Driver that holds a
> ConcurrentHashMap<Key, Value> and exposes an atomic compute()-style
> check-and-update/. It’s extremely simple to implement and reason abou. The
> trade-offs are straightforward per-update RPC latency and Driver memory/heap
> limits but it was the fastest way to validate semantics and prove we can
> eliminate row explosion.
> !image-2026-02-03-22-25-23-874.png|width=663,height=442!
> I also considered a local RocksDB-based backend, but did not investigate it
> deeply. At this stage, the focus is on validating the proposal and its
> semantics with the community; backend optimizations can be explored later if
> there is agreement on the feature.
> *References*
> - [https://github.com/lsmgeb89/asynch_bellman_ford]
> - [https://arxiv.org/abs/1204.6078]
> -
> https://www.repositorio.ufal.br/bitstream/123456789/10217/1/Modelagem%20ass%C3%ADncrona%20do%20Page%20Rank.pdf
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]