JingsongLi commented on a change in pull request #13306:
URL: https://github.com/apache/flink/pull/13306#discussion_r483358009



##########
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##########
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
                return orcProperties;
        }
 
+       private boolean isUnaryValid(CallExpression callExpression) {

Review comment:
       Can you move these codes to a class: `OrcFilters`

##########
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##########
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
                return orcProperties;
        }
 
+       private boolean isUnaryValid(CallExpression callExpression) {
+               return callExpression.getChildren().size() == 1 && 
callExpression.getChildren().get(0) instanceof FieldReferenceExpression;
+       }
+
+       private boolean isBinaryValid(CallExpression callExpression) {
+               return callExpression.getChildren().size() == 2 && 
((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && 
callExpression.getChildren().get(1) instanceof ValueLiteralExpression) ||
+                               (callExpression.getChildren().get(0) instanceof 
ValueLiteralExpression && callExpression.getChildren().get(1) instanceof 
FieldReferenceExpression));
+       }
+
+       public OrcSplitReader.Predicate toOrcPredicate(Expression expression) {
+               if (expression instanceof CallExpression) {
+                       CallExpression callExp = (CallExpression) expression;
+                       FunctionDefinition funcDef = 
callExp.getFunctionDefinition();
+
+                       if (funcDef == BuiltInFunctionDefinitions.IS_NULL || 
funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == 
BuiltInFunctionDefinitions.NOT) {

Review comment:
       Maybe we can have a `static final ImmutableMap<FunctionDefinition, 
Function<CallExpression, Expression>> FILTERS`. The function style can make 
codes better.

##########
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##########
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
                return orcProperties;
        }
 
+       private boolean isUnaryValid(CallExpression callExpression) {
+               return callExpression.getChildren().size() == 1 && 
callExpression.getChildren().get(0) instanceof FieldReferenceExpression;
+       }
+
+       private boolean isBinaryValid(CallExpression callExpression) {
+               return callExpression.getChildren().size() == 2 && 
((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && 
callExpression.getChildren().get(1) instanceof ValueLiteralExpression) ||
+                               (callExpression.getChildren().get(0) instanceof 
ValueLiteralExpression && callExpression.getChildren().get(1) instanceof 
FieldReferenceExpression));
+       }
+
+       public OrcSplitReader.Predicate toOrcPredicate(Expression expression) {
+               if (expression instanceof CallExpression) {
+                       CallExpression callExp = (CallExpression) expression;
+                       FunctionDefinition funcDef = 
callExp.getFunctionDefinition();
+
+                       if (funcDef == BuiltInFunctionDefinitions.IS_NULL || 
funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == 
BuiltInFunctionDefinitions.NOT) {
+                               if (!isUnaryValid(callExp)) {
+                                       // not a valid predicate
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", callExp);
+                                       return null;
+                               }
+
+                               PredicateLeaf.Type colType = 
toOrcType(((FieldReferenceExpression) 
callExp.getChildren().get(0)).getOutputDataType());
+                               if (colType == null) {
+                                       // unsupported type
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcTableSource.", callExp);
+                                       return null;
+                               }
+
+                               String colName = getColumnName(callExp);
+
+                               if (funcDef == 
BuiltInFunctionDefinitions.IS_NULL) {
+                                       return new 
OrcSplitReader.IsNull(colName, colType);
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.IS_NOT_NULL) {
+                                       return new OrcSplitReader.Not(
+                                                       new 
OrcSplitReader.IsNull(colName, colType));
+                               } else {
+                                       OrcSplitReader.Predicate c = 
toOrcPredicate(callExp.getChildren().get(0));
+                                       if (c == null) {
+                                               return null;
+                                       } else {
+                                               return new 
OrcSplitReader.Not(c);
+                                       }
+                               }
+                       } else if (funcDef == BuiltInFunctionDefinitions.OR) {
+                               if (callExp.getChildren().size() < 2) {
+                                       return null;
+                               }
+                               Expression left = callExp.getChildren().get(0);
+                               Expression right = callExp.getChildren().get(1);
+
+                               OrcSplitReader.Predicate c1 = 
toOrcPredicate(left);
+                               OrcSplitReader.Predicate c2 = 
toOrcPredicate(right);
+                               if (c1 == null || c2 == null) {
+                                       return null;
+                               } else {
+                                       return new OrcSplitReader.Or(c1, c2);
+                               }
+                       } else {
+                               if (!isBinaryValid(callExp)) {
+                                       // not a valid predicate
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", callExp);
+                                       return null;
+                               }
+
+                               PredicateLeaf.Type litType = 
getLiteralType(callExp);
+                               if (litType == null) {
+                                       // unsupported literal type
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", expression);
+                                       return null;
+                               }
+
+                               boolean literalOnRight = 
literalOnRight(callExp);
+                               String colName = getColumnName(callExp);
+
+                               // fetch literal and ensure it is serializable
+                               Object literalObj = getLiteral(callExp).get();
+                               Serializable literal;
+                               // validate that literal is serializable
+                               if (literalObj instanceof Serializable) {
+                                       literal = (Serializable) literalObj;
+                               } else {
+                                       LOG.warn("Encountered a 
non-serializable literal of type {}. " +
+                                                                       "Cannot 
push predicate [{}] into OrcFileSystemFormatFactory. " +
+                                                                       "This 
is a bug and should be reported.",
+                                                       
literalObj.getClass().getCanonicalName(), expression);
+                                       return null;
+                               }
+
+                               if (funcDef == 
BuiltInFunctionDefinitions.EQUALS) {
+                                       return new 
OrcSplitReader.Equals(colName, litType, literal);
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.NOT_EQUALS) {
+                                       return new OrcSplitReader.Not(
+                                                       new 
OrcSplitReader.Equals(colName, litType, literal));
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.GREATER_THAN) {
+                                       if (literalOnRight) {
+                                               return new OrcSplitReader.Not(
+                                                               new 
OrcSplitReader.LessThanEquals(colName, litType, literal));

Review comment:
       Looks like bug here? We should add more cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to