[
https://issues.apache.org/jira/browse/FLINK-17224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17323032#comment-17323032
]
Flink Jira Bot commented on FLINK-17224:
----------------------------------------
This issue is assigned but has not received an update in 7 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> Precision of TIME type does not work correctly
> ----------------------------------------------
>
> Key: FLINK-17224
> URL: https://issues.apache.org/jira/browse/FLINK-17224
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: Dawid Wysakowicz
> Assignee: Danny Chen
> Priority: Critical
> Labels: stale-assigned
>
> The support for precision in TIME type does not work correctly causing many
> different often cryptic problems.
> Precision is completely ignored in {{FlinkTypeFactory:440-446}}:
> {code}
> case TIME =>
> if (relDataType.getPrecision > 3) {
> throw new TableException(
> s"TIME precision is not supported: ${relDataType.getPrecision}")
> }
> // blink runner support precision 3, but for consistent with flink
> runner, we set to 0.
> new TimeType()
> {code}
> Example problem:
> {code}
> @Test
> public void testTimeScalarFunction() throws Exception {
> int nanoOfDay = 10 * 1_000_000;
> final List<Row> sourceData = Collections.singletonList(
> Row.of(LocalTime.ofNanoOfDay(nanoOfDay))
> );
> final List<Row> sinkData = Arrays.asList(
> Row.of(nanoOfDay)
> );
> TestCollectionTableFactory.reset();
> TestCollectionTableFactory.initData(sourceData);
> tEnv().sqlUpdate("CREATE TABLE SourceTable(s TIME(2)) WITH ('connector'
> = 'COLLECTION')");
> tEnv().sqlUpdate("CREATE TABLE SinkTable(s BIGINT) WITH ('connector' =
> 'COLLECTION')");
> tEnv().from("SourceTable")
> .select(call(new TimeScalarFunction(), $("s")))
> .insertInto("SinkTable");
> tEnv().execute("Test Job");
> assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
> }
> public static class TimeScalarFunction extends ScalarFunction {
> public Long eval(@DataTypeHint("TIME(1)") LocalTime time) {
> return time.toNanoOfDay();
> }
> }
> {code}
> fails with:
> {code}
> org.apache.flink.table.api.ValidationException: Invalid function call:
> org$apache$flink$table$planner$runtime$stream$table$FunctionITCase$TimeScalarFunction$a19cd231ba10cbbc0b55ebeda49e2a77(TIME(0))
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:198)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:73)
> at
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:486)
> at
> org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:277)
> at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:576)
> at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:583)
> at
> org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule.convert(FunctionDefinitionConvertRule.java:67)
> at
> org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
> at
> org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
> at
> org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:681)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:128)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$2(QueryOperationConverter.java:487)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:488)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:148)
> at
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:127)
> at
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
> at
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at
> org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:176)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:188)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:761)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:753)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:720)
> at
> org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.testTimeScalarFunction(FunctionITCase.java:151)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.flink.table.api.ValidationException: Could not infer an
> output type for the given arguments.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.inferOutputType(TypeInferenceUtil.java:146)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnTypeOrError(TypeInferenceReturnInference.java:82)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:70)
> ... 74 more
> {code}
> The problem is that after an input type inference we apply a cast {{cast("a"
> AS TIME(1))}} , but when to and from {{RelDataType}} this expression is
> converted to {{cast("a" AS TIME(0))}} and we end up again with wrong type
> which results in failling {{MappingTypeStrategy}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)