[
https://issues.apache.org/jira/browse/FLINK-26945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703653#comment-17703653
]
jackylau edited comment on FLINK-26945 at 3/22/23 1:05 PM:
-----------------------------------------------------------
and i I have tried ResolverRule like spark cast to date in advance, it works on
table api but not sql. because the sql will not to the logic.
{code:java}
final class DateTimeOperationsResolverRule implements ResolverRule {
private static final DateTypeExtractor DATE_TYPE_EXTRACTOR = new
DateTypeExtractor();
@Override
public List<Expression> apply(List<Expression> expression,
ResolutionContext context) {
return expression.stream()
.map(expr -> expr.accept(new
ExpressionResolverVisitor(context)))
.collect(Collectors.toList());
}
private static class ExpressionResolverVisitor extends
RuleExpressionVisitor<Expression> {
ExpressionResolverVisitor(ResolutionContext context) {
super(context);
}
@Override
public Expression visit(UnresolvedCallExpression unresolvedCall) {
if (unresolvedCall.getFunctionDefinition() ==
BuiltInFunctionDefinitions.DATE_SUB) {
List<Expression> children = unresolvedCall.getChildren();
Expression date = children.get(0);
resolutionContext.getOutputDataType();
if (date.accept(DATE_TYPE_EXTRACTOR)) {
Expression castedDate =
unresolvedCall(
BuiltInFunctionDefinitions.CAST,
date,
typeLiteral(DataTypes.DATE()));
return unresolvedCall(
BuiltInFunctionDefinitions.DATE_SUB, castedDate,
children.get(1));
}
}
return unresolvedCall;
}
@Override
protected Expression defaultMethod(Expression expression) {
return expression;
}
}
private static class DateTypeExtractor extends
ApiExpressionDefaultVisitor<Boolean> {
@Override
protected Boolean defaultMethod(Expression expression) {
return false;
}
/** for table api. */
@Override
public Boolean visit(FieldReferenceExpression fieldReference) {
final LogicalType literalType =
fieldReference.getOutputDataType().getLogicalType();
if (literalType.isAnyOf(LogicalTypeFamily.CHARACTER_STRING)
|| literalType.isAnyOf(
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return true;
}
return false;
}
@Override
public Boolean visit(ValueLiteralExpression valueLiteral) {
final LogicalType literalType =
valueLiteral.getOutputDataType().getLogicalType();
if (literalType.isAnyOf(LogicalTypeFamily.CHARACTER_STRING)
|| literalType.isAnyOf(
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return true;
}
return false;
}
}
} {code}
And i have also tried FlinkConvertletTable, but it only works on sql, but not
table api. the reason is table api will not go to this logical, it only
converter expressions to RexNode.
so i don't find a good implement like spark to converter string/timestamp type
to int in advance.
do you have a best implementation?
was (Author: jackylau):
and i I have tried ResolverRule like spark cast to date in advance, it works on
table api but not sql. because the sql will not to the logic.
{code:java}
final class DateTimeOperationsResolverRule implements ResolverRule {
private static final DateTypeExtractor DATE_TYPE_EXTRACTOR = new
DateTypeExtractor();
@Override
public List<Expression> apply(List<Expression> expression,
ResolutionContext context) {
return expression.stream()
.map(expr -> expr.accept(new
ExpressionResolverVisitor(context)))
.collect(Collectors.toList());
}
private static class ExpressionResolverVisitor extends
RuleExpressionVisitor<Expression> {
ExpressionResolverVisitor(ResolutionContext context) {
super(context);
}
@Override
public Expression visit(UnresolvedCallExpression unresolvedCall) {
if (unresolvedCall.getFunctionDefinition() ==
BuiltInFunctionDefinitions.DATE_SUB) {
List<Expression> children = unresolvedCall.getChildren();
Expression date = children.get(0);
resolutionContext.getOutputDataType();
if (date.accept(DATE_TYPE_EXTRACTOR)) {
Expression castedDate =
unresolvedCall(
BuiltInFunctionDefinitions.CAST,
date,
typeLiteral(DataTypes.DATE()));
return unresolvedCall(
BuiltInFunctionDefinitions.DATE_SUB, castedDate,
children.get(1));
}
}
return unresolvedCall;
}
@Override
protected Expression defaultMethod(Expression expression) {
return expression;
}
}
private static class DateTypeExtractor extends
ApiExpressionDefaultVisitor<Boolean> {
@Override
protected Boolean defaultMethod(Expression expression) {
return false;
}
/** for table api. */
@Override
public Boolean visit(FieldReferenceExpression fieldReference) {
final LogicalType literalType =
fieldReference.getOutputDataType().getLogicalType();
if (literalType.isAnyOf(LogicalTypeFamily.CHARACTER_STRING)
|| literalType.isAnyOf(
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return true;
}
return false;
}
@Override
public Boolean visit(ValueLiteralExpression valueLiteral) {
final LogicalType literalType =
valueLiteral.getOutputDataType().getLogicalType();
if (literalType.isAnyOf(LogicalTypeFamily.CHARACTER_STRING)
|| literalType.isAnyOf(
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return true;
}
return false;
}
}
} {code}
and i have also tried FlinkConvertletTable, but it only works on sql, but not
table api.
so i do n't find a good implement like spark to converter string/timestamp type
to int in advance.
do you have a best implementation?
> Add DATE_SUB supported in SQL & Table API
> -----------------------------------------
>
> Key: FLINK-26945
> URL: https://issues.apache.org/jira/browse/FLINK-26945
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: dalongliu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Returns the date {{numDays}} before {{{}startDate{}}}.
> Syntax:
> {code:java}
> date_sub(startDate, numDays) {code}
> Arguments:
> * {{{}startDate{}}}: A DATE expression.
> * {{{}numDays{}}}: An INTEGER expression.
> Returns:
> A DATE.
> If {{numDays}} is negative abs(num_days) are added to {{{}startDate{}}}.
> If the result date overflows the date range the function raises an error.
> Examples:
> {code:java}
> > SELECT date_sub('2016-07-30', 1);
> 2016-07-29 {code}
> See more:
> *
> [Spark|https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions]
> * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)