JingsongLi commented on a change in pull request #13306:
URL: https://github.com/apache/flink/pull/13306#discussion_r483360115



##########
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java
##########
@@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig 
options) {
                return orcProperties;
        }
 
+       private boolean isUnaryValid(CallExpression callExpression) {
+               return callExpression.getChildren().size() == 1 && 
callExpression.getChildren().get(0) instanceof FieldReferenceExpression;
+       }
+
+       private boolean isBinaryValid(CallExpression callExpression) {
+               return callExpression.getChildren().size() == 2 && 
((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && 
callExpression.getChildren().get(1) instanceof ValueLiteralExpression) ||
+                               (callExpression.getChildren().get(0) instanceof 
ValueLiteralExpression && callExpression.getChildren().get(1) instanceof 
FieldReferenceExpression));
+       }
+
+       public OrcSplitReader.Predicate toOrcPredicate(Expression expression) {
+               if (expression instanceof CallExpression) {
+                       CallExpression callExp = (CallExpression) expression;
+                       FunctionDefinition funcDef = 
callExp.getFunctionDefinition();
+
+                       if (funcDef == BuiltInFunctionDefinitions.IS_NULL || 
funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == 
BuiltInFunctionDefinitions.NOT) {
+                               if (!isUnaryValid(callExp)) {
+                                       // not a valid predicate
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", callExp);
+                                       return null;
+                               }
+
+                               PredicateLeaf.Type colType = 
toOrcType(((FieldReferenceExpression) 
callExp.getChildren().get(0)).getOutputDataType());
+                               if (colType == null) {
+                                       // unsupported type
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcTableSource.", callExp);
+                                       return null;
+                               }
+
+                               String colName = getColumnName(callExp);
+
+                               if (funcDef == 
BuiltInFunctionDefinitions.IS_NULL) {
+                                       return new 
OrcSplitReader.IsNull(colName, colType);
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.IS_NOT_NULL) {
+                                       return new OrcSplitReader.Not(
+                                                       new 
OrcSplitReader.IsNull(colName, colType));
+                               } else {
+                                       OrcSplitReader.Predicate c = 
toOrcPredicate(callExp.getChildren().get(0));
+                                       if (c == null) {
+                                               return null;
+                                       } else {
+                                               return new 
OrcSplitReader.Not(c);
+                                       }
+                               }
+                       } else if (funcDef == BuiltInFunctionDefinitions.OR) {
+                               if (callExp.getChildren().size() < 2) {
+                                       return null;
+                               }
+                               Expression left = callExp.getChildren().get(0);
+                               Expression right = callExp.getChildren().get(1);
+
+                               OrcSplitReader.Predicate c1 = 
toOrcPredicate(left);
+                               OrcSplitReader.Predicate c2 = 
toOrcPredicate(right);
+                               if (c1 == null || c2 == null) {
+                                       return null;
+                               } else {
+                                       return new OrcSplitReader.Or(c1, c2);
+                               }
+                       } else {
+                               if (!isBinaryValid(callExp)) {
+                                       // not a valid predicate
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", callExp);
+                                       return null;
+                               }
+
+                               PredicateLeaf.Type litType = 
getLiteralType(callExp);
+                               if (litType == null) {
+                                       // unsupported literal type
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", expression);
+                                       return null;
+                               }
+
+                               boolean literalOnRight = 
literalOnRight(callExp);
+                               String colName = getColumnName(callExp);
+
+                               // fetch literal and ensure it is serializable
+                               Object literalObj = getLiteral(callExp).get();
+                               Serializable literal;
+                               // validate that literal is serializable
+                               if (literalObj instanceof Serializable) {
+                                       literal = (Serializable) literalObj;
+                               } else {
+                                       LOG.warn("Encountered a 
non-serializable literal of type {}. " +
+                                                                       "Cannot 
push predicate [{}] into OrcFileSystemFormatFactory. " +
+                                                                       "This 
is a bug and should be reported.",
+                                                       
literalObj.getClass().getCanonicalName(), expression);
+                                       return null;
+                               }
+
+                               if (funcDef == 
BuiltInFunctionDefinitions.EQUALS) {
+                                       return new 
OrcSplitReader.Equals(colName, litType, literal);
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.NOT_EQUALS) {
+                                       return new OrcSplitReader.Not(
+                                                       new 
OrcSplitReader.Equals(colName, litType, literal));
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.GREATER_THAN) {
+                                       if (literalOnRight) {
+                                               return new OrcSplitReader.Not(
+                                                               new 
OrcSplitReader.LessThanEquals(colName, litType, literal));
+                                       } else {
+                                               return new 
OrcSplitReader.LessThan(colName, litType, literal);
+                                       }
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
+                                       if (literalOnRight) {
+                                               return new OrcSplitReader.Not(
+                                                               new 
OrcSplitReader.LessThan(colName, litType, literal));
+                                       } else {
+                                               return new 
OrcSplitReader.LessThanEquals(colName, litType, literal);
+                                       }
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.LESS_THAN) {
+                                       if (literalOnRight) {
+                                               return new 
OrcSplitReader.LessThan(colName, litType, literal);
+                                       } else {
+                                               return new OrcSplitReader.Not(
+                                                               new 
OrcSplitReader.LessThanEquals(colName, litType, literal));
+                                       }
+                               } else if (funcDef == 
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
+                                       if (literalOnRight) {
+                                               return new 
OrcSplitReader.LessThanEquals(colName, litType, literal);
+                                       } else {
+                                               return new OrcSplitReader.Not(
+                                                               new 
OrcSplitReader.LessThan(colName, litType, literal));
+                                       }
+                               } else {
+                                       // unsupported predicate
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into OrcFileSystemFormatFactory.", expression);
+                                       return null;
+                               }
+                       }
+               } else {
+                       // unsupported predicate
+                       LOG.debug("Unsupported predicate [{}] cannot be pushed 
into OrcFileSystemFormatFactory.", expression);
+                       return null;
+               }
+       }
+
+       private String getColumnName(CallExpression comp) {
+               if (literalOnRight(comp)) {
+                       return ((FieldReferenceExpression) 
comp.getChildren().get(0)).getName();
+               } else {
+                       return ((FieldReferenceExpression) 
comp.getChildren().get(1)).getName();
+               }
+       }
+
+       private boolean literalOnRight(CallExpression comp) {
+               if (comp.getChildren().size() == 1 && comp.getChildren().get(0) 
instanceof FieldReferenceExpression) {
+                       return true;
+               } else if (comp.getChildren().get(0) instanceof 
ValueLiteralExpression
+                               && comp.getChildren().get(1) instanceof 
FieldReferenceExpression) {
+                       return false;
+               } else if (comp.getChildren().get(0) instanceof 
FieldReferenceExpression
+                               && comp.getChildren().get(1) instanceof 
ValueLiteralExpression) {
+                       return true;
+               } else {
+                       throw new RuntimeException("Invalid binary 
comparison.");
+               }
+       }
+
+       private PredicateLeaf.Type getLiteralType(CallExpression comp) {
+               if (literalOnRight(comp)) {
+                       return toOrcType(((ValueLiteralExpression) 
comp.getChildren().get(1)).getOutputDataType());
+               } else {
+                       return toOrcType(((ValueLiteralExpression) 
comp.getChildren().get(0)).getOutputDataType());
+               }
+       }
+
+       private Optional<?> getLiteral(CallExpression comp) {
+               if (literalOnRight(comp)) {
+                       ValueLiteralExpression valueLiteralExpression = 
(ValueLiteralExpression) comp.getChildren().get(1);
+                       return 
valueLiteralExpression.getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass());
+               } else {
+                       ValueLiteralExpression valueLiteralExpression = 
(ValueLiteralExpression) comp.getChildren().get(0);
+                       return 
valueLiteralExpression.getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass());
+               }
+       }
+
+       public PredicateLeaf.Type toOrcType(DataType type) {
+               LogicalTypeRoot ltype = type.getLogicalType().getTypeRoot();
+
+               if (ltype == LogicalTypeRoot.TINYINT ||

Review comment:
       You can use `switch... case...`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to