[
https://issues.apache.org/jira/browse/FLINK-29547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xintong Song updated FLINK-29547:
---------------------------------
Fix Version/s: 1.18.0
(was: 1.17.0)
> Select a[1] which is array type for parquet complex type throw
> ClassCastException
> ----------------------------------------------------------------------------------
>
> Key: FLINK-29547
> URL: https://issues.apache.org/jira/browse/FLINK-29547
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.16.0
> Reporter: dalongliu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Regarding the following SQL test in HiveTableSourceITCase, it will throw
> ClassCastException.
> {code:java}
> batchTableEnv.executeSql(
> "create table parquet_complex_type_test("
> + "a array<int>, m map<int,string>, s
> struct<f1:int,f2:bigint>) stored as parquet");
> String[] modules = batchTableEnv.listModules();
> // load hive module so that we can use array,map, named_struct function
> // for convenient writing complex data
> batchTableEnv.loadModule("hive", new HiveModule());
> batchTableEnv.useModules("hive", CoreModuleFactory.IDENTIFIER);
> batchTableEnv
> .executeSql(
> "insert into parquet_complex_type_test"
> + " select array(1, 2), map(1, 'val1', 2, 'val2'),"
> + " named_struct('f1', 1, 'f2', 2)")
> .await();
> Table src = batchTableEnv.sqlQuery("select a[1] from
> parquet_complex_type_test");
> List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());{code}
> The exception stack:
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast
> to [Ljava.lang.Integer;
> at BatchExecCalc$37.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
> at
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:98)
> at
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:92)
> at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
> at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:401)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
>
> After debugging the code, I found the root cause is that source operator
> reads array data from parquet in the vectorized way, and it returns
> ColumnarArrayData, then in the calc operator we convert it to
> GenericArrayData, the object array is Object[] type instead of Integer[], so
> if we call the ArrayObjectArrayConverter#toExternal method converts it to
> Integer[], it still returns Object[] type, and then if convert the array to
> Integer[] type forcedly, we will get the exception.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)