[
https://issues.apache.org/jira/browse/FLINK-26945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17700971#comment-17700971
]
jackylau edited comment on FLINK-26945 at 3/16/23 4:34 AM:
-----------------------------------------------------------
hi [~jark] [~twalthr] [~snuyanzin] , when i implement the date_sub builtin
function i found the current built-in function implementation lacks samples and
is not developer friendly. So i have different 3 implementations, which way is
the best.
the type definition like this
{code:java}
// code placeholder
public static final BuiltInFunctionDefinition DATE_SUB =
BuiltInFunctionDefinition.newBuilder()
.name("DATE_SUB")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
or(
logical(LogicalTypeRoot.DATE),
logical(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
logical(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE),
logical(LogicalTypeFamily.CHARACTER_STRING)),
or(
InputTypeStrategies.explicit(TINYINT()),
InputTypeStrategies.explicit(SMALLINT()),
InputTypeStrategies.explicit(INT()))))
.outputTypeStrategy(nullableIfArgs(explicit(DATE())))
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.DateSubFunction")
.build();
{code}
1)like hive style, i think it is not good.
{code:java}
public @Nullable Object eval(Object startDate, Object days) {
if (startDate == null || days == null) {
return null;
} int start = 0;
if (startDateLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
start = BinaryStringDataUtil.toDate((BinaryStringData) startDate);
} else if
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
start =
(int)
(((TimestampData) startDate).getMillisecond()
/ DateTimeUtils.MILLIS_PER_DAY);
} else if
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
DateTimeUtils.timestampWithLocalZoneToDate((TimestampData)
startDate, LOCAL_TZ);
} else if (startDateLogicalType.is(LogicalTypeRoot.DATE)) {
start = (int) startDate;
} else {
throw new FlinkRuntimeException(
"DATE_SUB() don't support argument startDate type " +
startDate);
} return start - ((Number) days).intValue();
} {code}
2) like spark style.
spark will converts the String/Timestamp type to Date in advance. and it just
think it is int type in DateSub function like this
{code:java}
// think it is int type in DateSub function, so start.asInstanceOf[Int]
case class DateSub(startDate: Expression, days: Expression)
extends BinaryExpression with ExpectsInputTypes with NullIntolerant {
override def left: Expression = startDate
override def right: Expression = days
override def inputTypes: Seq[AbstractDataType] =
Seq(DateType, TypeCollection(IntegerType, ShortType, ByteType))
override def dataType: DataType = DateType
override def nullSafeEval(start: Any, d: Any): Any = {
start.asInstanceOf[Int] - d.asInstanceOf[Number].intValue()
}
override def prettyName: String = "date_sub"
}
// converts the String/Timestamp type to Date in advance
object DateTimeOperations extends TypeCoercionRule {
override val transform: PartialFunction[Expression, Expression] = {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e
case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate =
Cast(d.startDate, DateType))
case d @ DateAdd(StringType(), _) => d.copy(startDate = Cast(d.startDate,
DateType))
case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate =
Cast(d.startDate, DateType))
case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate,
DateType))
}
} {code}
If I refer to the implementation of spark, it needs to be implemented by
{color:#ff0000}*org.apache.flink.table.planner.expressions.converter.converters.CustomizedConverte*{color}r
like this.
but the CustomizedConverter will be call only in
*{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator{color}*
{code:java}
class DateSubConverter extends CustomizedConverter {
@Override
public RexNode convert(CallExpression call,
CallExpressionConvertRule.ConvertContext context) {
checkArgumentNumber(call, 2);
final RexNode child = context.toRexNode(call.getChildren().get(0));
final RelDataType targetRelDataType =
context.getTypeFactory()
.createFieldTypeFromLogicalType(call.getOutputDataType().getLogicalType());
return
context.getRelBuilder().getRexBuilder().makeAbstractCast(targetRelDataType,
child);
}
} {code}
3) using cast to date by
*{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator, the builtin cast
rule{color}*
TimestampToDateCastRule/StringToDateCastRule will cast it to Date , then using
DataStructureConverters.getConverter(startDate) to convert it to internal Date
Type, which is int.
this way also has some of if else to process different target type using cast.
and some useless code logic in cast between LocalDateTime and int.
so what is the best way and what is your suggestion?
was (Author: jackylau):
hi [~jark] [~twalthr] , when i implement the date_sub builtin function i found
the current built-in function implementation lacks samples and is not developer
friendly. So i have different 3 implementations, which way is the best.
the type definition like this
{code:java}
// code placeholder
public static final BuiltInFunctionDefinition DATE_SUB =
BuiltInFunctionDefinition.newBuilder()
.name("DATE_SUB")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
or(
logical(LogicalTypeRoot.DATE),
logical(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
logical(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE),
logical(LogicalTypeFamily.CHARACTER_STRING)),
or(
InputTypeStrategies.explicit(TINYINT()),
InputTypeStrategies.explicit(SMALLINT()),
InputTypeStrategies.explicit(INT()))))
.outputTypeStrategy(nullableIfArgs(explicit(DATE())))
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.DateSubFunction")
.build();
{code}
1)like hive style, i think it is not good.
{code:java}
public @Nullable Object eval(Object startDate, Object days) {
if (startDate == null || days == null) {
return null;
} int start = 0;
if (startDateLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
start = BinaryStringDataUtil.toDate((BinaryStringData) startDate);
} else if
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
start =
(int)
(((TimestampData) startDate).getMillisecond()
/ DateTimeUtils.MILLIS_PER_DAY);
} else if
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
DateTimeUtils.timestampWithLocalZoneToDate((TimestampData)
startDate, LOCAL_TZ);
} else if (startDateLogicalType.is(LogicalTypeRoot.DATE)) {
start = (int) startDate;
} else {
throw new FlinkRuntimeException(
"DATE_SUB() don't support argument startDate type " +
startDate);
} return start - ((Number) days).intValue();
} {code}
2) like spark style.
spark will converts the String/Timestamp type to Date in advance. and it just
think it is int type in DateSub function like this
{code:java}
// think it is int type in DateSub function, so start.asInstanceOf[Int]
case class DateSub(startDate: Expression, days: Expression)
extends BinaryExpression with ExpectsInputTypes with NullIntolerant {
override def left: Expression = startDate
override def right: Expression = days
override def inputTypes: Seq[AbstractDataType] =
Seq(DateType, TypeCollection(IntegerType, ShortType, ByteType))
override def dataType: DataType = DateType
override def nullSafeEval(start: Any, d: Any): Any = {
start.asInstanceOf[Int] - d.asInstanceOf[Number].intValue()
}
override def prettyName: String = "date_sub"
}
// converts the String/Timestamp type to Date in advance
object DateTimeOperations extends TypeCoercionRule {
override val transform: PartialFunction[Expression, Expression] = {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e
case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate =
Cast(d.startDate, DateType))
case d @ DateAdd(StringType(), _) => d.copy(startDate = Cast(d.startDate,
DateType))
case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate =
Cast(d.startDate, DateType))
case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate,
DateType))
}
} {code}
If I refer to the implementation of spark, it needs to be implemented by
{color:#FF0000}*org.apache.flink.table.planner.expressions.converter.converters.CustomizedConverte*{color}r
like this.
but the CustomizedConverter will be call only in
*{color:#FF0000}ExpressionEvaluatorFactory.createEvaluator{color}*
{code:java}
class DateSubConverter extends CustomizedConverter {
@Override
public RexNode convert(CallExpression call,
CallExpressionConvertRule.ConvertContext context) {
checkArgumentNumber(call, 2);
final RexNode child = context.toRexNode(call.getChildren().get(0));
final RelDataType targetRelDataType =
context.getTypeFactory()
.createFieldTypeFromLogicalType(call.getOutputDataType().getLogicalType());
return
context.getRelBuilder().getRexBuilder().makeAbstractCast(targetRelDataType,
child);
}
} {code}
3) using cast to date by
*{color:#FF0000}ExpressionEvaluatorFactory.createEvaluator, the builtin cast
rule{color}*
TimestampToDateCastRule/StringToDateCastRule will cast it to Date , then using
DataStructureConverters.getConverter(startDate) to convert it to internal Date
Type, which is int.
this way also has some of if else to process different target type using cast.
and some useless code logic in cast between LocalDateTime and int.
so what is the best way and what is your suggestion?
> Add DATE_SUB supported in SQL & Table API
> -----------------------------------------
>
> Key: FLINK-26945
> URL: https://issues.apache.org/jira/browse/FLINK-26945
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: dalongliu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Returns the date {{numDays}} before {{{}startDate{}}}.
> Syntax:
> {code:java}
> date_sub(startDate, numDays) {code}
> Arguments:
> * {{{}startDate{}}}: A DATE expression.
> * {{{}numDays{}}}: An INTEGER expression.
> Returns:
> A DATE.
> If {{numDays}} is negative abs(num_days) are added to {{{}startDate{}}}.
> If the result date overflows the date range the function raises an error.
> Examples:
> {code:java}
> > SELECT date_sub('2016-07-30', 1);
> 2016-07-29 {code}
> See more:
> *
> [Spark|https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions]
> * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)