[ 
https://issues.apache.org/jira/browse/FLINK-5737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864852#comment-15864852
 ] 

zhangjing commented on FLINK-5737:
----------------------------------

Hi Fabian, thanks a lot. The bug was already fixed, I will close this jira.

> Fix the bug when TableSource contains a field of byte[] type
> ------------------------------------------------------------
>
>                 Key: FLINK-5737
>                 URL: https://issues.apache.org/jira/browse/FLINK-5737
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> At current, if a TableSource contains a field of byte[] type, TableException 
> would be thrown when optimize RelNode tree.
> If we run the following code, logBlockTableSource contain one field: f0,  
> which is  byte[] type.
>  {code}
>               tableEnv.registerTableSource("t1", logBlockTableSource);
>               tableEnv.registerFunction("parse", new BinaryParser());
>               Table ttDatas = tableEnv.sql("select parse(f0) from t1");
>               DataStream<String> result = tableEnv.toDataStream(ttDatas, 
> String.class);
>               result.addSink(new PrintSinkFunction<String>());
>               env.execute();
>       public static class BinaryParser extends ScalarFunction {
>               public String eval(byte[] bytes) {
>                       return new String(bytes);
>               }
>       }
>  {code}
> we would get the following exception:
> {code}
> Exception in thread "main" java.lang.AssertionError: Internal error: Error 
> occurred while applying rule StreamTableSourceScanRule
>       at org.apache.calcite.util.Util.newInternal(Util.java:792)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
>       at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
>       at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:264)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:231)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:259)
>       at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(StreamTableEnvironment.scala:148)
>       at 
> com.alibaba.blink.streaming.connectors.tt.examples.TT4TableSourceExample.main(TT4TableSourceExample.java:51)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: org.apache.flink.table.api.TableException: Unsupported data type 
> encountered: ARRAY
>       at org.apache.flink.table.api.TableException$.apply(exceptions.scala:51)
>       at 
> org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:124)
>       at 
> org.apache.flink.table.plan.nodes.FlinkRel$$anonfun$estimateRowSize$2.apply(FlinkRel.scala:108)
>       at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>       at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:47)
>       at 
> org.apache.flink.table.plan.nodes.FlinkRel$class.estimateRowSize(FlinkRel.scala:108)
>       at 
> org.apache.flink.table.plan.nodes.datastream.StreamScan.estimateRowSize(StreamScan.scala:37)
>       at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.computeSelfCost(StreamTableSourceScan.scala:46)
>       at 
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>       at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
> Source)
>       at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
> Source)
>       at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:258)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:1128)
>       at 
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:336)
>       at 
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:319)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1830)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1766)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1032)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1052)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1942)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:136)
> {code}
>   this is because all PrimitiveArrayTypeInfo typeInformation (including 
> byte[], boolean[], short[], int[],  long[], float[], double[], char[]) would 
> transfer to ArrayRelDataType, but estimateRowSize method in FlinkRel class 
> does not support Array SqlType field.
> Solution:
> Maybe it's better to transfer byte[] typeInformation to BINARY, VARBINARY 
> SqlType rather than ArrayRelDataType sqlType. And when estimate  BINARY, 
> VARBINARY field size, we could give the estimate value just like the VARCHAR 
> value. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to