zml1206 commented on code in PR #11153:
URL:
https://github.com/apache/incubator-gluten/pull/11153#discussion_r2567896014
##########
gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala:
##########
@@ -37,7 +37,40 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
import org.apache.spark.sql.catalyst.util._
/** Returns the filters that can be pushed down to native file scan */
- def filterExprs(): Seq[Expression]
+ final def filterExprs(): Seq[Expression] = {
+ if (pushDownFilters.nonEmpty) {
+ val (_, scanFiltersNotInPushDownFilters) =
+ scanFilters.partition(pushDownFilters.get.contains(_))
+ // For filters that only exists in scan, we need to check if they are
supported.
+ val unsupportedFilters = scanFiltersNotInPushDownFilters.filter(
+
!BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_, this))
+ if (unsupportedFilters.nonEmpty) {
+ throw new UnsupportedOperationException(
+ "Found unsupported filter in scan " + unsupportedFilters.mkString(",
"))
+ }
+ val supportedPushDownFilters = pushDownFilters.get
+
.filter(BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_,
this))
+ FilterHandler.combineFilters(supportedPushDownFilters,
scanFiltersNotInPushDownFilters)
+ } else {
+ // todo: When PushDownFilterToScan is not performed, find a way to throw
an
+ // exception to trigger scan fallback when encountering unsupported
scan filters,
+ // instead of simply filtering them out.
+ scanFilters.filter(
+
BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_, this))
+ }
+ }
+
+ /** Returns the filters that already exists in scan. */
+ def scanFilters: Seq[Expression]
+
+ /**
+ * Returns the filters that pushed by
+ * [[org.apache.gluten.extension.columnar.PushDownFilterToScan]].
+ */
+ def pushDownFilters: Option[Seq[Expression]]
+
+ /** Copy the scan with filters that pushed by filterNode. */
+ def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer
Review Comment:
Suggestion:
```
val pushDownFilters: Option[Seq[Expression]]
/** Copy the scan with filters that pushed by filterNode. */
def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer = {
this.copy(pushDownFilters = Some(filters))
}
```
Reduce the override of `withNewPushdownFilters`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]