liuyongvs commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452591843
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
this.projectedFields =
Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
}
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ List<ResolvedExpression> acceptedFilters = new
ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new
ArrayList<>();
+ for (ResolvedExpression expr : filters) {
+ if (shouldPushDown(expr)) {
+ acceptedFilters.add(expr);
+ } else {
+ remainingFilters.add(expr);
+ }
+ }
+ this.filterPredicates = acceptedFilters;
+ return Result.of(acceptedFilters, remainingFilters);
+ }
+
+ private Boolean shouldPushDown(Expression expr) {
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 2) {
+ return
shouldPushDownUnaryExpression(expr.getChildren().get(0))
+ &&
shouldPushDownUnaryExpression(expr.getChildren().get(1));
+ }
+ return false;
+ }
+
+ private boolean shouldPushDownUnaryExpression(Expression expr) {
+ if (expr instanceof FieldReferenceExpression) {
+ if
(filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+ return true;
+ }
+ }
+
+ if (expr instanceof ValueLiteralExpression) {
+ return true;
+ }
+
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 1) {
+ if (((CallExpression)
expr).getFunctionDefinition().equals(UPPER)
+ || ((CallExpression)
expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+ return
shouldPushDownUnaryExpression(expr.getChildren().get(0));
+ }
+ }
+ // other resolved expressions return false
+ return false;
+ }
+
+ private Boolean applyPredicatesToRow(Row row) {
+ if (filterPredicates == null) {
+ return true;
+ }
+ for (ResolvedExpression expr : filterPredicates) {
+ if (expr instanceof CallExpression &&
expr.getChildren().size() == 2) {
+ if
(!binaryFilterApplies((CallExpression) expr, row)) {
+ return false;
+ }
+ } else {
+ throw new RuntimeException(expr + " not
supported!");
+ }
+ }
+ return true;
+ }
+
+ private boolean binaryFilterApplies(CallExpression binExpr, Row
row) {
+ List<Expression> children = binExpr.getChildren();
+ Preconditions.checkArgument(children.size() == 2);
+ Tuple2<Comparable, Comparable> tuple2 =
extractValues(binExpr, row);
+ Comparable lhsValue = tuple2._1;
+ Comparable rhsValue = tuple2._2;
+ FunctionDefinition functionDefinition =
binExpr.getFunctionDefinition();
+
+ if
(BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) {
+ return lhsValue.compareTo(rhsValue) > 0;
+ } else if
(BuiltInFunctionDefinitions.LESS_THAN.equals(functionDefinition)) {
+ return lhsValue.compareTo(rhsValue) < 0;
+ } else if
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(functionDefinition)) {
+ return lhsValue.compareTo(rhsValue) >= 0;
+ } else if
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(functionDefinition)) {
+ return lhsValue.compareTo(rhsValue) <= 0;
+ } else if
(BuiltInFunctionDefinitions.EQUALS.equals(functionDefinition)) {
+ return lhsValue.compareTo(rhsValue) == 0;
+ } else if
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(functionDefinition)) {
+ return lhsValue.compareTo(rhsValue) != 0;
+ } else {
+ return false;
+ }
+ }
+
+ private Tuple2<Comparable, Comparable>
extractValues(CallExpression binExpr, Row row) {
+ List<Expression> children = binExpr.getChildren();
+ Preconditions.checkArgument(children.size() == 2);
+ return new Tuple2(getValue(children.get(0), row),
getValue(children.get(1), row));
+ }
+
+ private Comparable getValue(Expression expr, Row row) {
+ if (expr instanceof ValueLiteralExpression) {
+ Optional value = ((ValueLiteralExpression)
expr).getValueAs(((ValueLiteralExpression)
expr).getOutputDataType().getConversionClass());
+ if (value.isPresent()) {
+ return (Comparable) value.get();
+ } else {
+ return null;
+ }
+ }
+
+ if (expr instanceof FieldReferenceExpression) {
+ RowTypeInfo rowTypeInfo = new
RowTypeInfo(TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataTypes()),
physicalSchema.getFieldNames());
+ int idx =
rowTypeInfo.getFieldIndex(((FieldReferenceExpression) expr).getName());
Review comment:
ok, changed to
int idx =
Arrays.asList(physicalSchema.getFieldNames()).indexOf(((FieldReferenceExpression)
expr).getName());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]