[ https://issues.apache.org/jira/browse/FLINK-5737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhangjing updated FLINK-5737: ----------------------------- Description: At current, if a TableSource contains a field of byte[] type, TableException would be thrown when optimize RelNode tree. If we run the following code, logBlockTableSource contain one field: f0, which is byte[] type. ` tableEnv.registerTableSource("t1", logBlockTableSource); tableEnv.registerFunction("parse", new BinaryParser()); Table ttDatas = tableEnv.sql("select parse(f0) from t1"); DataStream<String> result = tableEnv.toDataStream(ttDatas, String.class); result.addSink(new PrintSinkFunction<String>()); env.execute(); public static class BinaryParser extends ScalarFunction { public String eval(byte[] bytes) { return new String(bytes); } } ` we would get the following exception: `Exception in thread "main" java.lang.AssertionError: Internal error: Error occurred while applying rule StreamTableSourceScanRule at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:264) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:231) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:259) at org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(StreamTableEnvironment.scala:148) at com.alibaba.blink.streaming.connectors.tt.examples.TT4TableSourceExample.main(TT4TableSourceExample.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: org.apache.flink.table.api.TableException: Unsupported data type encountered: ARRAY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:51) at org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:124) at org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:108) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47) at org.apache.flink.table.plan.nodes.FlinkRel$class.estimateRowSize(FlinkRel.scala:108) at org.apache.flink.table.plan.nodes.datastream.StreamScan.estimateRowSize(StreamScan.scala:37) at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.computeSelfCost(StreamTableSourceScan.scala:46) at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258) at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1128) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336) at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319) at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1830) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1766) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1032) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1052) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1942) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:136) ` this is because byte[] typeInformation would transfer to ArrayRelDataType, but estimateRowSize method in FlinkRel class does not support Array SqlType field. Solution: Maybe it's better to transfer byte[] typeInformation to BINARY, VARBINARY SqlType rather than ArrayRelDataType sqlType. And when estimate BINARY, VARBINARY field size, we could give the estimate value just like the VARCHAR value. > Fix the bug when TableSource contains a field of byte[] type > ------------------------------------------------------------ > > Key: FLINK-5737 > URL: https://issues.apache.org/jira/browse/FLINK-5737 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: zhangjing > Assignee: zhangjing > > At current, if a TableSource contains a field of byte[] type, TableException > would be thrown when optimize RelNode tree. > If we run the following code, logBlockTableSource contain one field: f0, > which is byte[] type. > ` > tableEnv.registerTableSource("t1", logBlockTableSource); > tableEnv.registerFunction("parse", new BinaryParser()); > Table ttDatas = tableEnv.sql("select parse(f0) from t1"); > DataStream<String> result = tableEnv.toDataStream(ttDatas, > String.class); > result.addSink(new PrintSinkFunction<String>()); > env.execute(); > public static class BinaryParser extends ScalarFunction { > public String eval(byte[] bytes) { > return new String(bytes); > } > } > ` > we would get the following exception: > `Exception in thread "main" java.lang.AssertionError: Internal error: Error > occurred while applying rule StreamTableSourceScanRule > at org.apache.calcite.util.Util.newInternal(Util.java:792) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:264) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:231) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:259) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(StreamTableEnvironment.scala:148) > at > com.alibaba.blink.streaming.connectors.tt.examples.TT4TableSourceExample.main(TT4TableSourceExample.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: org.apache.flink.table.api.TableException: Unsupported data type > encountered: ARRAY > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:51) > at > org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:124) > at > org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:108) > at > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) > at > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) > at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47) > at > org.apache.flink.table.plan.nodes.FlinkRel$class.estimateRowSize(FlinkRel.scala:108) > at > org.apache.flink.table.plan.nodes.datastream.StreamScan.estimateRowSize(StreamScan.scala:37) > at > org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.computeSelfCost(StreamTableSourceScan.scala:46) > at > org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162) > at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown > Source) > at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown > Source) > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1128) > at > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336) > at > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1830) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1766) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1032) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1052) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1942) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:136) > ` > this is because byte[] typeInformation would transfer to ArrayRelDataType, > but estimateRowSize method in FlinkRel class does not support Array SqlType > field. > Solution: > Maybe it's better to transfer byte[] typeInformation to BINARY, VARBINARY > SqlType rather than ArrayRelDataType sqlType. And when estimate BINARY, > VARBINARY field size, we could give the estimate value just like the VARCHAR > value. -- This message was sent by Atlassian JIRA (v6.3.15#6346)