[ 
https://issues.apache.org/jira/browse/FLINK-17847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-17847:
-------------------------------
    Fix Version/s:     (was: 1.10.0)
                   1.12.0

> ArrayIndexOutOfBoundsException happens when codegen StreamExec operator
> -----------------------------------------------------------------------
>
>                 Key: FLINK-17847
>                 URL: https://issues.apache.org/jira/browse/FLINK-17847
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Leonard Xu
>            Assignee: Leonard Xu
>            Priority: Major
>             Fix For: 1.11.0, 1.12.0
>
>
> user case:
> {code:java}
> //source table 
> create table json_table( 
> w_es BIGINT, 
> w_type STRING, 
> w_isDdl BOOLEAN,
>  w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, 
> account_pay_fee DOUBLE>>,
>  w_ts TIMESTAMP(3), 
> w_table STRING) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.10',
>   'connector.topic' = 'json-test2',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'test-jdbc',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true'
> )
> // real data:
> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}
> //query
> select w_ts, 'test' as city1_id,  w_data[0].pay_info AS cate3_id,
>  w_data as pay_order_id from json_table
> {code}
> ~exception:~
> {code:java}
> //
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 1427848Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 1427848 at 
> org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598)
>  at 
> org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590)
>  at 
> org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534)
>  at 
> org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117) 
> at StreamExecCalc$10.processElement(Unknown Source)
> {code}
>  
> Looks like in the codegen StreamExecCalc$10 operator some operation visit a 
> '-1' index which should be wrong, this bug exits both in 1.10 and 1.11
>  
> {code:java}
> public class StreamExecCalc$10 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
>     implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
>     private final Object[] references;
>     private final org.apache.flink.table.dataformat.BinaryString str$3 = 
> org.apache.flink.table.dataformat.BinaryString.fromString("test");
>     private transient 
> org.apache.flink.table.runtime.typeutils.BaseArraySerializer typeSerializer$5;
>     final org.apache.flink.table.dataformat.BoxedWrapperRow out = new 
> org.apache.flink.table.dataformat.BoxedWrapperRow(4);
>     private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>     public StreamExecCalc$10(
>         Object[] references,
>         org.apache.flink.streaming.runtime.tasks.StreamTask task,
>         org.apache.flink.streaming.api.graph.StreamConfig config,
>         org.apache.flink.streaming.api.operators.Output output) throws 
> Exception {
>         this.references = references;
>         typeSerializer$5 = 
> (((org.apache.flink.table.runtime.typeutils.BaseArraySerializer) 
> references[0]));
>         this.setup(task, config, output);
>     }
>     @Override
>     public void open() throws Exception {
>         super.open();
>     }
>     @Override
>     public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
>         org.apache.flink.table.dataformat.BaseRow in1 = 
> (org.apache.flink.table.dataformat.BaseRow) element.getValue();
>         org.apache.flink.table.dataformat.SqlTimestamp field$2;
>         boolean isNull$2;
>         org.apache.flink.table.dataformat.BaseArray field$4;
>         boolean isNull$4;
>         org.apache.flink.table.dataformat.BaseArray field$6;
>         org.apache.flink.table.dataformat.BinaryString field$8;
>         boolean isNull$8;
>         org.apache.flink.table.dataformat.BinaryString result$9;
>         boolean isNull$9;
>         isNull$2 = in1.isNullAt(4);
>         field$2 = null;
>         if (!isNull$2) {
>             field$2 = in1.getTimestamp(4, 3);
>         }
>         isNull$4 = in1.isNullAt(3);
>         field$4 = null;
>         if (!isNull$4) {
>             field$4 = in1.getArray(3);
>         }
>         field$6 = field$4;
>         if (!isNull$4) {
>             field$6 = (org.apache.flink.table.dataformat.BaseArray) 
> (typeSerializer$5.copy(field$6));
>         }
>         out.setHeader(in1.getHeader());
>         if (isNull$2) {
>             out.setNullAt(0);
>         } else {
>             out.setNonPrimitiveValue(0, field$2);
>         }
>         if (false) {
>             out.setNullAt(1);
>         } else {
>             out.setNonPrimitiveValue(1, 
> ((org.apache.flink.table.dataformat.BinaryString) str$3));
>         }
>         boolean isNull$7 = isNull$4 || false || field$6.isNullAt(((int) 0) - 
> 1);
>         org.apache.flink.table.dataformat.BaseRow result$7 = isNull$7 ? null 
> : field$6.getRow(((int) 0) - 1, 4);
>         if (isNull$7) {
>             result$9 = 
> org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8;
>             isNull$9 = true;
>         }
>         else {
>             isNull$8 = result$7.isNullAt(0);
>             field$8 = 
> org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8;
>             if (!isNull$8) {
>                 field$8 = result$7.getString(0);
>             }
>             result$9 = field$8;
>             isNull$9 = isNull$8;
>         }
>         if (isNull$9) {
>             out.setNullAt(2);
>         } else {
>             out.setNonPrimitiveValue(2, result$9);
>         }
>         if (isNull$4) {
>             out.setNullAt(3);
>         } else {
>             out.setNonPrimitiveValue(3, field$6);
>         }
>         output.collect(outElement.replace(out));
>    }
>     @Override
>     public void close() throws Exception {
>         super.close();
>     }
> }
>     
> {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to