[ 
https://issues.apache.org/jira/browse/FLINK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441496#comment-17441496
 ] 

Shen Zhu commented on FLINK-21565:
----------------------------------

Hi Nico, I'm interested in working on this ticket, would you mind assigning 
this to me?

> Support more integer types in TIMESTAMPADD
> ------------------------------------------
>
>                 Key: FLINK-21565
>                 URL: https://issues.apache.org/jira/browse/FLINK-21565
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.12.1
>            Reporter: Nico Kruber
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>         Attachments: flights-21565.csv
>
>
> At the moment, {{TIMESTAMPADD}} does not seem to support {{SMALLINT}} or 
> {{TINYINT}} types which should be perfectly suitable for auto-conversion (in 
> contrast to BIGINT or floating numbers where I would expect the user to cast 
> it appropriately).
> With the attached file, executing these lines
> {code}
> CREATE TABLE `flights` (
>   `_YEAR` CHAR(4),
>   `_MONTH` CHAR(2),
>   `_DAY` CHAR(2),
>   `_SCHEDULED_DEPARTURE` CHAR(4),
>   `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || 
> `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
>   `_DEPARTURE_TIME` CHAR(4),
>   `DEPARTURE_DELAY` TINYINT,
>   `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, `DEPARTURE_DELAY`, 
> TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || 
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'))
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'file:///tmp/kaggle-flight-delay/flights-21565.csv',
>   'format' = 'csv'
> );
> SELECT * FROM flights;
> {code}
> currently fail with the following exception (similarly for {{SMALLINT}}):
> {code}
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported casting 
> from TINYINT to INTERVAL SECOND(3).
>       at 
> org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.numericCasting(ScalarOperatorGens.scala:2352)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateBinaryArithmeticOperator(ScalarOperatorGens.scala:93)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:590)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:529)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) 
> ~[flink-table_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) 
> ~[flink-table_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) 
> ~[flink-table_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:143)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:169)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.Iterator.foreach(Iterator.scala:937) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.Iterator.foreach$(Iterator.scala:937) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.IterableLike.foreach(IterableLike.scala:70) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
>  ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>  ~[flink-table_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321)
>  ~[flink-table_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:328)
>  ~[flink-table_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:287)
>  ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256)
>  ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:282)
>  ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
>       at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:542)
>  ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
>       ... 8 more
> {code}
> Current workaround is to manually cast to INT:
> {code}
> CREATE TABLE `flights` (
>   `_YEAR` CHAR(4),
>   `_MONTH` CHAR(2),
>   `_DAY` CHAR(2),
>   `_SCHEDULED_DEPARTURE` CHAR(4),
>   `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || 
> `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
>   `_DEPARTURE_TIME` CHAR(4),
>   `DEPARTURE_DELAY` SMALLINT,
>   `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), 
> TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || 
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'))
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'file:///tmp/kaggle-flight-delay/flights-21563.csv',
>   'format' = 'csv'
> );
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to