[
https://issues.apache.org/jira/browse/SEDONA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608875#comment-17608875
]
RJ Marcus commented on SEDONA-156:
----------------------------------
Jia, thank you for the response! Overall it sounds like it’s headed in the
right direction.
WRT “pushed filters” and “partition filters”, I thought that the _pushed_
filters are going to be using BBox during the scan process to skip files? I
have extracted the bbox from the parquet metadata to pass into the
pushedFilters sets of functions. I have been largely ignoring partition filters
for now.
I took a look at the repositories in the neapowers website (itachi, bebe,
sql-alchemy), and I am not convinced that they are applicable to this scenario
because those Expressions are all data transformations (on existing sql
datatypes) instead of predicate filters (so they don’t even have to worry about
being pushed down anyway). If they did have custom predicate filters I think it
would run into the same problem unless they use the operators that already
exist in sql syntax ( <, > , =, OR, AND, NOT, >=, <=, IS NULL, IN, CONTAINS,
etc. [see
Filters|https://github.com/apache/spark/blob/branch-3.3/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala])
It’s good that the ST_* functions are defined as Expressions. I _think_ that we
should be able to coerce them to work in the DatasourceV2 API.
The problem I mentioned earlier is that even though in V2 API the +pushFilters+
function takes in {{Seq[Expression]}} , [the function that actually pushes the
expression to the datasource is pushDataFilters.
|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala#L90-L95]That
one takes {{Array[Filter]}} which cannot be extended to allow new definitions
for our ST_* predicates. The ST_Within predicate basically gets to that
DataSourceStrategy.translateFilter and then fails because it can’t be
translated into a {{Filter}} .
{code:java}
override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
val (partitionFilters, dataFilters) =
DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema,
filters)
this.partitionFilters = partitionFilters
this.dataFilters = dataFilters
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
for (filterExpr <- dataFilters) {
val translated = DataSourceStrategy.translateFilter(filterExpr, true)
if (translated.nonEmpty) {
translatedFilters += translated.get
}
}
pushedDataFilters = pushDataFilters(translatedFilters.toArray)
dataFilters
}
override def pushedFilters: Array[Predicate] = pushedDataFilters.map(_.toV2)
/*
* Push down data filters to the file source, so the data filters can be
evaluated there to
* reduce the size of the data to be read. By default, data filters are not
pushed down.
* File source needs to implement this method to push down data filters.
*/
protected def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] =
Array.empty[Filter]
{code}
So, I _think_ that we can override +pushFilters+ and +pushedFilters+ to be able
to translate the original filters (e.g. “ \{{ col1 == 5 }} “ ) the way it is
doing now, then we ignore +pushDataFilters+ since it’s not used anywhere else.
Finally, we rewrite a bunch of downstream functions in the existing
ParquetScanBuilder/ParquetFilter which currently only deal with {{Filter}} .
I’m attempting to rewrite these as minimally as possible.
I'll update with info after I've tried that
> predicate pushdown support for GeoParquet
> -----------------------------------------
>
> Key: SEDONA-156
> URL: https://issues.apache.org/jira/browse/SEDONA-156
> Project: Apache Sedona
> Issue Type: New Feature
> Reporter: RJ Marcus
> Priority: Major
> Fix For: 1.3.0
>
>
> Support for filter predicate for the new GeoParquet reader.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)