JingsongLi commented on a change in pull request #13306: URL: https://github.com/apache/flink/pull/13306#discussion_r483360115
########## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java ########## @@ -83,14 +96,227 @@ private static Properties getOrcProperties(ReadableConfig options) { return orcProperties; } + private boolean isUnaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 1 && callExpression.getChildren().get(0) instanceof FieldReferenceExpression; + } + + private boolean isBinaryValid(CallExpression callExpression) { + return callExpression.getChildren().size() == 2 && ((callExpression.getChildren().get(0) instanceof FieldReferenceExpression && callExpression.getChildren().get(1) instanceof ValueLiteralExpression) || + (callExpression.getChildren().get(0) instanceof ValueLiteralExpression && callExpression.getChildren().get(1) instanceof FieldReferenceExpression)); + } + + public OrcSplitReader.Predicate toOrcPredicate(Expression expression) { + if (expression instanceof CallExpression) { + CallExpression callExp = (CallExpression) expression; + FunctionDefinition funcDef = callExp.getFunctionDefinition(); + + if (funcDef == BuiltInFunctionDefinitions.IS_NULL || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL || funcDef == BuiltInFunctionDefinitions.NOT) { + if (!isUnaryValid(callExp)) { + // not a valid predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", callExp); + return null; + } + + PredicateLeaf.Type colType = toOrcType(((FieldReferenceExpression) callExp.getChildren().get(0)).getOutputDataType()); + if (colType == null) { + // unsupported type + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", callExp); + return null; + } + + String colName = getColumnName(callExp); + + if (funcDef == BuiltInFunctionDefinitions.IS_NULL) { + return new OrcSplitReader.IsNull(colName, colType); + } else if (funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL) { + return new OrcSplitReader.Not( + new OrcSplitReader.IsNull(colName, colType)); + } else { + OrcSplitReader.Predicate c = toOrcPredicate(callExp.getChildren().get(0)); + if (c == null) { + return null; + } else { + return new OrcSplitReader.Not(c); + } + } + } else if (funcDef == BuiltInFunctionDefinitions.OR) { + if (callExp.getChildren().size() < 2) { + return null; + } + Expression left = callExp.getChildren().get(0); + Expression right = callExp.getChildren().get(1); + + OrcSplitReader.Predicate c1 = toOrcPredicate(left); + OrcSplitReader.Predicate c2 = toOrcPredicate(right); + if (c1 == null || c2 == null) { + return null; + } else { + return new OrcSplitReader.Or(c1, c2); + } + } else { + if (!isBinaryValid(callExp)) { + // not a valid predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", callExp); + return null; + } + + PredicateLeaf.Type litType = getLiteralType(callExp); + if (litType == null) { + // unsupported literal type + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", expression); + return null; + } + + boolean literalOnRight = literalOnRight(callExp); + String colName = getColumnName(callExp); + + // fetch literal and ensure it is serializable + Object literalObj = getLiteral(callExp).get(); + Serializable literal; + // validate that literal is serializable + if (literalObj instanceof Serializable) { + literal = (Serializable) literalObj; + } else { + LOG.warn("Encountered a non-serializable literal of type {}. " + + "Cannot push predicate [{}] into OrcFileSystemFormatFactory. " + + "This is a bug and should be reported.", + literalObj.getClass().getCanonicalName(), expression); + return null; + } + + if (funcDef == BuiltInFunctionDefinitions.EQUALS) { + return new OrcSplitReader.Equals(colName, litType, literal); + } else if (funcDef == BuiltInFunctionDefinitions.NOT_EQUALS) { + return new OrcSplitReader.Not( + new OrcSplitReader.Equals(colName, litType, literal)); + } else if (funcDef == BuiltInFunctionDefinitions.GREATER_THAN) { + if (literalOnRight) { + return new OrcSplitReader.Not( + new OrcSplitReader.LessThanEquals(colName, litType, literal)); + } else { + return new OrcSplitReader.LessThan(colName, litType, literal); + } + } else if (funcDef == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) { + if (literalOnRight) { + return new OrcSplitReader.Not( + new OrcSplitReader.LessThan(colName, litType, literal)); + } else { + return new OrcSplitReader.LessThanEquals(colName, litType, literal); + } + } else if (funcDef == BuiltInFunctionDefinitions.LESS_THAN) { + if (literalOnRight) { + return new OrcSplitReader.LessThan(colName, litType, literal); + } else { + return new OrcSplitReader.Not( + new OrcSplitReader.LessThanEquals(colName, litType, literal)); + } + } else if (funcDef == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) { + if (literalOnRight) { + return new OrcSplitReader.LessThanEquals(colName, litType, literal); + } else { + return new OrcSplitReader.Not( + new OrcSplitReader.LessThan(colName, litType, literal)); + } + } else { + // unsupported predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", expression); + return null; + } + } + } else { + // unsupported predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.", expression); + return null; + } + } + + private String getColumnName(CallExpression comp) { + if (literalOnRight(comp)) { + return ((FieldReferenceExpression) comp.getChildren().get(0)).getName(); + } else { + return ((FieldReferenceExpression) comp.getChildren().get(1)).getName(); + } + } + + private boolean literalOnRight(CallExpression comp) { + if (comp.getChildren().size() == 1 && comp.getChildren().get(0) instanceof FieldReferenceExpression) { + return true; + } else if (comp.getChildren().get(0) instanceof ValueLiteralExpression + && comp.getChildren().get(1) instanceof FieldReferenceExpression) { + return false; + } else if (comp.getChildren().get(0) instanceof FieldReferenceExpression + && comp.getChildren().get(1) instanceof ValueLiteralExpression) { + return true; + } else { + throw new RuntimeException("Invalid binary comparison."); + } + } + + private PredicateLeaf.Type getLiteralType(CallExpression comp) { + if (literalOnRight(comp)) { + return toOrcType(((ValueLiteralExpression) comp.getChildren().get(1)).getOutputDataType()); + } else { + return toOrcType(((ValueLiteralExpression) comp.getChildren().get(0)).getOutputDataType()); + } + } + + private Optional<?> getLiteral(CallExpression comp) { + if (literalOnRight(comp)) { + ValueLiteralExpression valueLiteralExpression = (ValueLiteralExpression) comp.getChildren().get(1); + return valueLiteralExpression.getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass()); + } else { + ValueLiteralExpression valueLiteralExpression = (ValueLiteralExpression) comp.getChildren().get(0); + return valueLiteralExpression.getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass()); + } + } + + public PredicateLeaf.Type toOrcType(DataType type) { + LogicalTypeRoot ltype = type.getLogicalType().getTypeRoot(); + + if (ltype == LogicalTypeRoot.TINYINT || Review comment: You can use `switch... case...` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org