liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452604505
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
this.projectedFields =
Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
}
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ List<ResolvedExpression> acceptedFilters = new
ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new
ArrayList<>();
+ for (ResolvedExpression expr : filters) {
+ if (shouldPushDown(expr)) {
+ acceptedFilters.add(expr);
+ } else {
+ remainingFilters.add(expr);
+ }
+ }
+ this.filterPredicates = acceptedFilters;
+ return Result.of(acceptedFilters, remainingFilters);
+ }
+
+ private Boolean shouldPushDown(Expression expr) {
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 2) {
+ return
shouldPushDownUnaryExpression(expr.getChildren().get(0))
+ &&
shouldPushDownUnaryExpression(expr.getChildren().get(1));
+ }
+ return false;
+ }
+
+ private boolean shouldPushDownUnaryExpression(Expression expr) {
+ if (expr instanceof FieldReferenceExpression) {
+ if
(filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+ return true;
+ }
+ }
+
+ if (expr instanceof ValueLiteralExpression) {
+ return true;
+ }
+
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 1) {
+ if (((CallExpression)
expr).getFunctionDefinition().equals(UPPER)
+ || ((CallExpression)
expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+ return
shouldPushDownUnaryExpression(expr.getChildren().get(0));
+ }
+ }
+ // other resolved expressions return false
+ return false;
+ }
+
+ private Boolean applyPredicatesToRow(Row row) {
+ if (filterPredicates == null) {
+ return true;
+ }
+ for (ResolvedExpression expr : filterPredicates) {
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 2) {
+ if
(!binaryFilterApplies((CallExpression) expr, row)) {
+ return false;
+ }
+ } else {
+ throw new RuntimeException(expr + " not
supported!");
+ }
+ }
+ return true;
+ }
+
+ private boolean binaryFilterApplies(CallExpression binExpr, Row
row) {
+ List<Expression> children = binExpr.getChildren();
+ Preconditions.checkArgument(children.size() == 2);
+ Tuple2<Comparable, Comparable> tuple2 =
extractValues(binExpr, row);
Review comment:
the code here , which i just refer to TestFilterableSource.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]