[
https://issues.apache.org/jira/browse/SPARK-55869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Scott Schenkein updated SPARK-55869:
------------------------------------
Description:
h2. Problem
DataSource V2 predicate pushdown is limited to a fixed, closed set of
expression types hardcoded in {{{}V2ExpressionBuilder{}}}. Data source authors
who need to push predicates involving:
*Custom operators* (e.g. {{{}my_col indexquery 'param'{}}},
{{{}my_boolean_function(col1, col2, 'p'){}}})
*Builtin Spark expressions not in the pushdown whitelist* (e.g.
{{{}RLIKE{}}}, {{{}LIKE{}}}, {{{}ILIKE{}}})
are forced to resort to fragile workarounds: intercepting the logical plan
via {{SparkSessionExtensions}} injected rules, using thread-local state to
smuggle filter information past the optimizer, and effectively reimplementing
pushdown outside Spark's designed architecture.
These hacks are:
* Brittle across Spark versions
* Invisible to Spark's query planning (no EXPLAIN output, no metrics)
* Unable to participate in Spark's post-scan filter safety net
* Duplicative — every data source author reinvents the same machinery
h2. Proposed Solution
Three independently adoptable layers:
*Layer 1: Capability-gated builtin predicate translation*
New {{SupportsPushDownPredicateCapabilities}} interface (mix-in for
{{{}ScanBuilder{}}}) lets data sources declare which additional V2 predicate
names they support (e.g. {{{}"RLIKE"{}}}, {{{}"LIKE"{}}}).
{{V2ExpressionBuilder}} uses this set to conditionally translate builtin
Catalyst expressions that currently have no match case.
Tier 1 builtins: {{{}LIKE{}}}, {{{}RLIKE{}}}, {{{}IS_NAN{}}},
{{{}ARRAY_CONTAINS{}}}, {{{}MAP_CONTAINS_KEY{}}}.
*Layer 2: Custom predicate functions*
New {{SupportsCustomPredicates}} interface (mix-in for {{{}Table{}}}) lets
data sources declare custom boolean predicate functions with dot-qualified
canonical names (e.g. {{{}"com.mycompany.INDEXQUERY"{}}}).
A new analyzer rule ({{{}ResolveCustomPredicates{}}}) resolves these during
analysis. A new {{CustomPredicateExpression}} Catalyst node translates to a
namespaced V2 {{Predicate}} during pushdown. A safety rule
({{{}EnsureCustomPredicatesPushed{}}}) fails queries if a custom predicate
wasn't pushed.
*Layer 3: Custom infix operator syntax*
Helper base class {{CustomOperatorParserExtension}} simplifies parser
extensions that rewrite custom infix operators (e.g. {{{}col INDEXQUERY
'param'{}}}) to function call syntax, composing cleanly with Layer 2.
h2. Key Design Decisions
* Custom predicate names use dot-qualified canonical names to avoid collision
with Spark builtins
* {{CustomPredicateExpression}} uses {{CodegenFallback}} (not
{{{}Unevaluable{}}}) with a post-optimizer safety rule
* Layer 1 capabilities gate only predicate-level translation; scalar
expression gating is unnecessary
* Analyzer rule registered in {{postHocResolutionRules}} batch
* Custom predicates produce namespaced {{Predicate}} directly (no
{{BOOLEAN_EXPRESSION}} wrapper)
h2. Validation
Design validated against a production DSv2 connector (IndexTables4Spark) that
currently uses ~585 lines of ThreadLocal + WeakHashMap + logical plan
interception hacks for custom predicate pushdown.
The proposed design eliminates all of these hack patterns.
was:
h2. Problem
DataSource V2 predicate pushdown is limited to a fixed, closed set of
expression types hardcoded in {{{}V2ExpressionBuilder{}}}. Data source authors
who need to push predicates involving:
# *Custom operators* (e.g. {{{}my_col indexquery 'param'{}}},
{{{}my_boolean_function(col1, col2, 'p'){}}})
# *Builtin Spark expressions not in the pushdown whitelist* (e.g.
{{{}RLIKE{}}}, {{{}LIKE{}}}, {{{}ILIKE{}}})
are forced to resort to fragile workarounds: intercepting the logical plan
via {{SparkSessionExtensions}} injected rules, using thread-local state to
smuggle filter information past the optimizer, and
effectively reimplementing pushdown outside Spark's designed architecture.
These hacks are:
* Brittle across Spark versions
* Invisible to Spark's query planning (no EXPLAIN output, no metrics)
* Unable to participate in Spark's post-scan filter safety net
* Duplicative — every data source author reinvents the same machinery
Proposed Solution
Three independently adoptable layers:
*Layer 1: Capability-gated builtin predicate translation*
New {{SupportsPushDownPredicateCapabilities}} interface (mix-in for
{{{}ScanBuilder{}}}) lets data sources declare which additional V2 predicate
names they support (e.g. {{{}"RLIKE"{}}}, {{{}"LIKE"{}}}).
{{V2ExpressionBuilder}} uses this set to conditionally translate builtin
Catalyst expressions that currently have no match case.
Tier 1 builtins: {{{}LIKE{}}}, {{{}RLIKE{}}}, {{{}IS_NAN{}}},
{{{}ARRAY_CONTAINS{}}}, {{{}MAP_CONTAINS_KEY{}}}.
*Layer 2: Custom predicate functions*
New {{SupportsCustomPredicates}} interface (mix-in for {{{}Table{}}}) lets
data sources declare custom boolean predicate functions with dot-qualified
canonical names (e.g. {{{}"com.mycompany.INDEXQUERY"{}}}).
A new analyzer rule ({{{}ResolveCustomPredicates{}}}) resolves these during
analysis. A new {{CustomPredicateExpression}} Catalyst node translates to a
namespaced V2 {{Predicate}} during pushdown. A
safety rule ({{{}EnsureCustomPredicatesPushed{}}}) fails queries if a custom
predicate wasn't pushed.
*Layer 3: Custom infix operator syntax*
Helper base class {{CustomOperatorParserExtension}} simplifies parser
extensions that rewrite custom infix operators (e.g. {{{}col INDEXQUERY
'param'{}}}) to function call syntax, composing cleanly with
Layer 2.
h2. Key Design Decisions
* Custom predicate names use dot-qualified canonical names to avoid collision
with Spark builtins
* {{CustomPredicateExpression}} uses {{CodegenFallback}} (not
{{{}Unevaluable{}}}) with a post-optimizer safety rule
* Layer 1 capabilities gate only predicate-level translation; scalar
expression gating is unnecessary
* Analyzer rule registered in {{postHocResolutionRules}} batch
* Custom predicates produce namespaced {{Predicate}} directly (no
{{BOOLEAN_EXPRESSION}} wrapper)
h2. Validation
Design validated against a production DSv2 connector (IndexTables4Spark) that
currently uses ~585 lines of ThreadLocal + WeakHashMap + logical plan
interception hacks for custom predicate pushdown.
The proposed design eliminates all of these hack patterns.
h2. Implementation Plan
||Phase||Scope||
|Phase 1|{{{}SupportsPushDownPredicateCapabilities{}}} +
{{V2ExpressionBuilder}} capability gating + Tier 1 builtins|
|Phase 2|{{{}SupportsCustomPredicates{}}} + {{CustomPredicateDescriptor}} +
analyzer rule + safety rule + Tier 2 builtins|
|Phase 3|{{{}CustomOperatorParserExtension{}}} helper + documentation|
|Phase 4|JDBC reference implementation|
> Extensible predicate pushdown for DataSource V2
> ------------------------------------------------
>
> Key: SPARK-55869
> URL: https://issues.apache.org/jira/browse/SPARK-55869
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 4.0.0
> Reporter: Scott Schenkein
> Priority: Major
> Labels: SQL, connector, datasource, predicate, pushdown
>
> h2. Problem
>
> DataSource V2 predicate pushdown is limited to a fixed, closed set of
> expression types hardcoded in {{{}V2ExpressionBuilder{}}}. Data source
> authors who need to push predicates involving:
> *Custom operators* (e.g. {{{}my_col indexquery 'param'{}}},
> {{{}my_boolean_function(col1, col2, 'p'){}}})
> *Builtin Spark expressions not in the pushdown whitelist* (e.g.
> {{{}RLIKE{}}}, {{{}LIKE{}}}, {{{}ILIKE{}}})
> are forced to resort to fragile workarounds: intercepting the logical plan
> via {{SparkSessionExtensions}} injected rules, using thread-local state to
> smuggle filter information past the optimizer, and effectively reimplementing
> pushdown outside Spark's designed architecture.
>
> These hacks are:
> * Brittle across Spark versions
> * Invisible to Spark's query planning (no EXPLAIN output, no metrics)
> * Unable to participate in Spark's post-scan filter safety net
> * Duplicative — every data source author reinvents the same machinery
> h2. Proposed Solution
> Three independently adoptable layers:
> *Layer 1: Capability-gated builtin predicate translation*
> New {{SupportsPushDownPredicateCapabilities}} interface (mix-in for
> {{{}ScanBuilder{}}}) lets data sources declare which additional V2 predicate
> names they support (e.g. {{{}"RLIKE"{}}}, {{{}"LIKE"{}}}).
> {{V2ExpressionBuilder}} uses this set to conditionally translate builtin
> Catalyst expressions that currently have no match case.
> Tier 1 builtins: {{{}LIKE{}}}, {{{}RLIKE{}}}, {{{}IS_NAN{}}},
> {{{}ARRAY_CONTAINS{}}}, {{{}MAP_CONTAINS_KEY{}}}.
> *Layer 2: Custom predicate functions*
> New {{SupportsCustomPredicates}} interface (mix-in for {{{}Table{}}}) lets
> data sources declare custom boolean predicate functions with dot-qualified
> canonical names (e.g. {{{}"com.mycompany.INDEXQUERY"{}}}).
> A new analyzer rule ({{{}ResolveCustomPredicates{}}}) resolves these
> during analysis. A new {{CustomPredicateExpression}} Catalyst node translates
> to a namespaced V2 {{Predicate}} during pushdown. A safety rule
> ({{{}EnsureCustomPredicatesPushed{}}}) fails queries if a custom predicate
> wasn't pushed.
>
> *Layer 3: Custom infix operator syntax*
> Helper base class {{CustomOperatorParserExtension}} simplifies parser
> extensions that rewrite custom infix operators (e.g. {{{}col INDEXQUERY
> 'param'{}}}) to function call syntax, composing cleanly with Layer 2.
>
> h2. Key Design Decisions
>
> * Custom predicate names use dot-qualified canonical names to avoid
> collision with Spark builtins
> * {{CustomPredicateExpression}} uses {{CodegenFallback}} (not
> {{{}Unevaluable{}}}) with a post-optimizer safety rule
> * Layer 1 capabilities gate only predicate-level translation; scalar
> expression gating is unnecessary
> * Analyzer rule registered in {{postHocResolutionRules}} batch
> * Custom predicates produce namespaced {{Predicate}} directly (no
> {{BOOLEAN_EXPRESSION}} wrapper)
>
> h2. Validation
>
> Design validated against a production DSv2 connector (IndexTables4Spark)
> that currently uses ~585 lines of ThreadLocal + WeakHashMap + logical plan
> interception hacks for custom predicate pushdown.
> The proposed design eliminates all of these hack patterns.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]