godfreyhe commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452219045
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
this.projectedFields =
Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
}
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ List<ResolvedExpression> acceptedFilters = new
ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new
ArrayList<>();
+ for (ResolvedExpression expr : filters) {
+ if (shouldPushDown(expr)) {
+ acceptedFilters.add(expr);
+ } else {
+ remainingFilters.add(expr);
+ }
+ }
+ this.filterPredicates = acceptedFilters;
+ return Result.of(acceptedFilters, remainingFilters);
+ }
+
+ private Boolean shouldPushDown(Expression expr) {
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 2) {
+ return
shouldPushDownUnaryExpression(expr.getChildren().get(0))
+ &&
shouldPushDownUnaryExpression(expr.getChildren().get(1));
+ }
+ return false;
+ }
+
+ private boolean shouldPushDownUnaryExpression(Expression expr) {
+ if (expr instanceof FieldReferenceExpression) {
+ if
(filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+ return true;
+ }
+ }
+
+ if (expr instanceof ValueLiteralExpression) {
+ return true;
+ }
+
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 1) {
+ if (((CallExpression)
expr).getFunctionDefinition().equals(UPPER)
+ || ((CallExpression)
expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+ return
shouldPushDownUnaryExpression(expr.getChildren().get(0));
+ }
+ }
+ // other resolved expressions return false
+ return false;
+ }
+
+ private Boolean applyPredicatesToRow(Row row) {
+ if (filterPredicates == null) {
+ return true;
+ }
+ for (ResolvedExpression expr : filterPredicates) {
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 2) {
+ if
(!binaryFilterApplies((CallExpression) expr, row)) {
+ return false;
+ }
+ } else {
+ throw new RuntimeException(expr + " not
supported!");
+ }
+ }
+ return true;
+ }
+
+ private boolean binaryFilterApplies(CallExpression binExpr, Row
row) {
+ List<Expression> children = binExpr.getChildren();
+ Preconditions.checkArgument(children.size() == 2);
+ Tuple2<Comparable, Comparable> tuple2 =
extractValues(binExpr, row);
Review comment:
we should limit the supported type of filter fields in `applyFilters`
method, otherwise we can't extract the value to `Comparable` directly. Some
types are not `Comparable`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]