[ 
https://issues.apache.org/jira/browse/FLINK-5737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangjing updated FLINK-5737:
-----------------------------
    Description: 
At current, if a TableSource contains a field of byte[] type, TableException 
would be thrown when optimize RelNode tree.
If we run the following code, logBlockTableSource contain one field: f0,  which 
is  byte[] type.
 `
                tableEnv.registerTableSource("t1", logBlockTableSource);
                tableEnv.registerFunction("parse", new BinaryParser());

                Table ttDatas = tableEnv.sql("select parse(f0) from t1");
                DataStream<String> result = tableEnv.toDataStream(ttDatas, 
String.class);
                result.addSink(new PrintSinkFunction<String>());
                env.execute();

        public static class BinaryParser extends ScalarFunction {
                public String eval(byte[] bytes) {
                        return new String(bytes);
                }
        }
 `
we would get the following exception:
`Exception in thread "main" java.lang.AssertionError: Internal error: Error 
occurred while applying rule StreamTableSourceScanRule
        at org.apache.calcite.util.Util.newInternal(Util.java:792)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
        at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
        at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
        at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:264)
        at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:231)
        at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:259)
        at 
org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(StreamTableEnvironment.scala:148)
        at 
com.alibaba.blink.streaming.connectors.tt.examples.TT4TableSourceExample.main(TT4TableSourceExample.java:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.table.api.TableException: Unsupported data type 
encountered: ARRAY
        at org.apache.flink.table.api.TableException$.apply(exceptions.scala:51)
        at 
org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:124)
        at 
org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:108)
        at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
        at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
        at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)
        at 
org.apache.flink.table.plan.nodes.FlinkRel$class.estimateRowSize(FlinkRel.scala:108)
        at 
org.apache.flink.table.plan.nodes.datastream.StreamScan.estimateRowSize(StreamScan.scala:37)
        at 
org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.computeSelfCost(StreamTableSourceScan.scala:46)
        at 
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
        at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
Source)
        at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
        at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1128)
        at 
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)
        at 
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1830)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1766)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1032)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1052)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1942)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:136)
`
  this is because byte[] typeInformation would transfer to ArrayRelDataType, 
but estimateRowSize method in FlinkRel class does not support Array SqlType 
field.

Solution:
Maybe it's better to transfer byte[] typeInformation to BINARY, VARBINARY 
SqlType rather than ArrayRelDataType sqlType. And when estimate  BINARY, 
VARBINARY field size, we could give the estimate value just like the VARCHAR 
value. 

> Fix the bug when TableSource contains a field of byte[] type
> ------------------------------------------------------------
>
>                 Key: FLINK-5737
>                 URL: https://issues.apache.org/jira/browse/FLINK-5737
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> At current, if a TableSource contains a field of byte[] type, TableException 
> would be thrown when optimize RelNode tree.
> If we run the following code, logBlockTableSource contain one field: f0,  
> which is  byte[] type.
>  `
>               tableEnv.registerTableSource("t1", logBlockTableSource);
>               tableEnv.registerFunction("parse", new BinaryParser());
>               Table ttDatas = tableEnv.sql("select parse(f0) from t1");
>               DataStream<String> result = tableEnv.toDataStream(ttDatas, 
> String.class);
>               result.addSink(new PrintSinkFunction<String>());
>               env.execute();
>       public static class BinaryParser extends ScalarFunction {
>               public String eval(byte[] bytes) {
>                       return new String(bytes);
>               }
>       }
>  `
> we would get the following exception:
> `Exception in thread "main" java.lang.AssertionError: Internal error: Error 
> occurred while applying rule StreamTableSourceScanRule
>       at org.apache.calcite.util.Util.newInternal(Util.java:792)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
>       at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
>       at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:264)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:231)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:259)
>       at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(StreamTableEnvironment.scala:148)
>       at 
> com.alibaba.blink.streaming.connectors.tt.examples.TT4TableSourceExample.main(TT4TableSourceExample.java:51)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: org.apache.flink.table.api.TableException: Unsupported data type 
> encountered: ARRAY
>       at org.apache.flink.table.api.TableException$.apply(exceptions.scala:51)
>       at 
> org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:124)
>       at 
> org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:108)
>       at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>       at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)
>       at 
> org.apache.flink.table.plan.nodes.FlinkRel$class.estimateRowSize(FlinkRel.scala:108)
>       at 
> org.apache.flink.table.plan.nodes.datastream.StreamScan.estimateRowSize(StreamScan.scala:37)
>       at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.computeSelfCost(StreamTableSourceScan.scala:46)
>       at 
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
> Source)
>       at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
> Source)
>       at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1128)
>       at 
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)
>       at 
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1830)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1766)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1032)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1052)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1942)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:136)
> `
>   this is because byte[] typeInformation would transfer to ArrayRelDataType, 
> but estimateRowSize method in FlinkRel class does not support Array SqlType 
> field.
> Solution:
> Maybe it's better to transfer byte[] typeInformation to BINARY, VARBINARY 
> SqlType rather than ArrayRelDataType sqlType. And when estimate  BINARY, 
> VARBINARY field size, we could give the estimate value just like the VARCHAR 
> value. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to