[
https://issues.apache.org/jira/browse/FLINK-13578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-13578:
-----------------------------------
Labels: stale-critical (was: )
> Blink throws exception when using Types.INTERVAL_MILLIS in TableSource
> ----------------------------------------------------------------------
>
> Key: FLINK-13578
> URL: https://issues.apache.org/jira/browse/FLINK-13578
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.0
> Reporter: Wei Zhong
> Priority: Critical
> Labels: stale-critical
>
> Running this program will throw a TableException:
> {code:java}
> object Tests {
> class MyTableSource extends InputFormatTableSource[java.lang.Long] {
> val data = new java.util.ArrayList[java.lang.Long]()
> data.add(1L)
> data.add(2L)
> data.add(3L)
> val dataType = Types.INTERVAL_MILLIS()
> val inputFormat = new CollectionInputFormat[java.lang.Long](
> data, dataType.createSerializer(new ExecutionConfig))
> override def getInputFormat: InputFormat[java.lang.Long, _ <: InputSplit]
> = inputFormat
> override def getTableSchema: TableSchema =
> TableSchema.fromTypeInfo(dataType)
> override def getReturnType: TypeInformation[java.lang.Long] = dataType
> }
> def main(args: Array[String]): Unit = {
> val tenv = TableEnvironmentImpl.create(
>
> EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build())
> val table = tenv.fromTableSource(new MyTableSource)
> tenv.registerTableSink("sink", Array("f0"),
> Array(Types.INTERVAL_MILLIS()), new CsvTableSink("/tmp/results"))
> table.select("f0").insertInto("sink")
> tenv.execute("test")
> }
> }
> {code}
> The TableException detail:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Unsupported conversion from data type 'INTERVAL SECOND(3)' (conversion class:
> java.time.Duration) to type information. Only data types that originated from
> type information fully support a reverse conversion.
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:242)
> at
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
> at
> org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(TypeInfoDataTypeConverter.java:145)
> at
> org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(TypeInfoLogicalTypeConverter.java:58)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at
> org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.<init>(BaseRowTypeInfo.java:64)
> at
> org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.of(BaseRowTypeInfo.java:210)
> at
> org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:126)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:112)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToTransformation(BatchExecSink.scala:127)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:92)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:50)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:50)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:70)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:69)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:69)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408)
> at
> org.apache.flink.table.planner.runtime.batch.table.Tests$.main(CalcITCase.scala:711)
> at
> org.apache.flink.table.planner.runtime.batch.table.Tests.main(CalcITCase.scala)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)