[ 
https://issues.apache.org/jira/browse/FLINK-15363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011343#comment-17011343
 ] 

Zhenghua Gao commented on FLINK-15363:
--------------------------------------

Currently *HBaseTableSchema* use TypeInformation to specify an HBase table's 
schema, which would cause precision/scale loss for several data types. 
Meanwhile *HBaseTypeUtils* serialize a java.sql.Timestamp to long in bytes, 
which would cause precision loss for TIMESTAMP types.

the hbase connector should use new type system to fix this.

> Hbase connector do not support datatypes with precision like TIMESTAMP(9) and 
> DECIMAL(10,4)
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15363
>                 URL: https://issues.apache.org/jira/browse/FLINK-15363
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>    Affects Versions: 1.10.0
>            Reporter: Leonard Xu
>            Priority: Major
>             Fix For: 1.10.0
>
>
> {code:java}
> // exception msg
> rowtype of new rel:rowtype of new rel:RecordType(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" order_id, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" item, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, 
> DECIMAL(10, 4) amount, TIMESTAMP(3) order_time, TIME ATTRIBUTE(PROCTIME) NOT 
> NULL proc_time, DECIMAL(20, 4) amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT 
> currency_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, 
> DECIMAL(38, 4) rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" rowkey, RecordType:peek_no_expand(INTEGER country_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" country_name, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" country_name_cn, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" region_name) f1, 
> RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(3) 
> record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(38, 18) gdp) f2) NOT 
> NULLrowtype of set:RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> order_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" item, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, DECIMAL(10, 4) amount, 
> TIMESTAMP(3) order_time, TIME ATTRIBUTE(PROCTIME) NOT NULL proc_time, 
> DECIMAL(20, 4) amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT currency_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, DECIMAL(38, 4) 
> rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
> RecordType:peek_no_expand(INTEGER country_id, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" country_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> country_name_cn, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" region_name) f1, 
> RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(9) 
> record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(10, 4) gdp) f2) NOT 
> NULL at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:84)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:223)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>  at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>  at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>  at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>  at 
> job.KafkaJoinHbase2Hbase.testJoinHbaseWithPrecision(KafkaJoinHbase2Hbase.java:109)
>  at job.KafkaJoinHbase2Hbase.main(KafkaJoinHbase2Hbase.java:24)Caused by: 
> java.lang.AssertionError: Type mismatch:rowtype of new 
> rel:RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" item, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" currency, DECIMAL(10, 4) amount, TIMESTAMP(3) 
> order_time, TIME ATTRIBUTE(PROCTIME) NOT NULL proc_time, DECIMAL(20, 4) 
> amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT currency_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, DECIMAL(38, 4) 
> rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
> RecordType:peek_no_expand(INTEGER country_id, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" country_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> country_name_cn, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" region_name) f1, 
> RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(3) 
> record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(38, 18) gdp) f2) NOT 
> NULLrowtype of set:RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> order_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" item, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, DECIMAL(10, 4) amount, 
> TIMESTAMP(3) order_time, TIME ATTRIBUTE(PROCTIME) NOT NULL proc_time, 
> DECIMAL(20, 4) amount_kg, TIME ATTRIBUTE(ROWTIME) ts, BIGINT currency_id, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency_name, DECIMAL(38, 4) 
> rate, TIMESTAMP(3) currency_time, VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" country, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
> RecordType:peek_no_expand(INTEGER country_id, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" country_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> country_name_cn, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" region_name) f1, 
> RecordType:peek_no_expand(TIMESTAMP(3) record_timestamp3, TIMESTAMP(9) 
> record_timestamp9, TIME(0) time3, TIME(0) time9, DECIMAL(10, 4) gdp) f2) NOT 
> NULL at org.apache.calcite.util.Litmus$1.fail(Litmus.java:31) at 
> org.apache.calcite.plan.RelOptUtil.equal(RelOptUtil.java:2026) at 
> org.apache.calcite.plan.volcano.RelSubset.add(RelSubset.java:284) at 
> org.apache.calcite.plan.volcano.RelSet.add(RelSet.java:148) at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1818)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1764)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1939)
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:129)
>  at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236) 
> at 
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.onMatch(CommonLookupJoinRule.scala:142)
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>  ... 21 more
> Process finished with exit code 1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to