[ https://issues.apache.org/jira/browse/FLINK-35523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869578#comment-17869578 ]
Jichao Wang edited comment on FLINK-35523 at 9/12/24 3:21 AM: -------------------------------------------------------------- In response to this problem, I have done the following steps: # Confirm the scope of the issue: The vectorization read method of the Flink Hive connector supports complex nested fields in ORC tables and Parquet tables {code:sql} create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as parquet; create table orc_array_struct(a array<struct<a1:int, a2:int>>) stored as orcfile; {code} The flink connector is able to read orc_array_struct tables normally, but the above exception is thrown when reading the parquet_array_struct, so the problem is only for parquet tables; # Validating the _table.exec.hive.fallback-mapred-reader=true_ parameter can circumvent this issue; # Try to fix the null pointer exception: Change line 503 of the ParquetSplitReaderUtil class to the following code: {code:java} createWritableColumnVector( batchSize, arrayType.getElementType(), type.asGroupType().getType(0).asGroupType().getType(0), descriptors, depth + 2)); {code} Although the null pointer exception was fixed, the following exception was thrown: {code:java} Unable to find source-code formatter for language: text. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlCaused by: java.lang.RuntimeException: Unsupported type in the list: ROW<`a1` INT> at org.apache.flink.formats.parquet.vector.reader.ArrayColumnReader.readPrimitiveTypedRow(ArrayColumnReader.java:175) {code} Finally, combined with the source code confirmation, the Hive connector does not support vectorized reading of complex nested fields of parquet tables, such as array<struct<a1:int, a2:int>>. Combined with the above analysis, I think we can identify this complex nested type in advance in the org.apache.flink.connectors.hive.read.HiveInputFormat#isVectorizationUnsupported method, and read the hive table in a mapred way. was (Author: wjc920): In response to this problem, I have done the following steps: # Confirm the scope of the issue: The vectorization read method of the Flink Hive connector supports complex nested fields in ORC tables and Parquet tables {code:sql} create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as parquet; create table orc_array_struct(a array<struct<a1:int, a2:int>>) stored as orcfile; {code} The flink connector is able to read orc_array_struct tables normally, but the above exception is thrown when reading the parquet_array_struct, so the problem is only for parquet tables; # Validating the _table.exec.hive.fallback-mapred-reader=true_ parameter can circumvent this issue; # Try to fix the null pointer exception: Change line 503 of the ParquetSplitReaderUtil class to the following code: {code:java} createWritableColumnVector( batchSize, arrayType.getElementType(), type.asGroupType().getType(0).asGroupType().getType(0), descriptors, depth + 2)); {code} Although the null pointer exception was fixed, the following exception was thrown: {code} Unable to find source-code formatter for language: text. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlCaused by: java.lang.RuntimeException: Unsupported type in the list: ROW<`a1` INT> at org.apache.flink.formats.parquet.vector.reader.ArrayColumnReader.readPrimitiveTypedRow(ArrayColumnReader.java:175) {code} Finally, combined with the source code confirmation, the Hive connector does not support vectorized reading of complex nested fields of parquet tables, such as array<struct<a1:int, a2:int>>. Combined with the above analysis, I think we can identify this complex nested type in advance in the org.apache.flink.connectors.hive.read.HiveInputFormat#isVectorizationUnsupported method, and read the hive table in a mapred way. For more information about the fix, see Issue Links(I'll give my patch at a later date.). > When using the Hive connector to read a Hive table in Parquet format, a null > pointer exception is thrown. > --------------------------------------------------------------------------------------------------------- > > Key: FLINK-35523 > URL: https://issues.apache.org/jira/browse/FLINK-35523 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Affects Versions: 1.16.2, 1.19.0 > Reporter: Jichao Wang > Priority: Major > Fix For: 2.0-preview > > > When using the Hive connector to read a Hive table in Parquet format, a null > pointer exception is thrown. > The exception stack information is as follows: > {noformat} > java.lang.RuntimeException: SplitFetcher thread 0 received unexpected > exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) > [flink-connector-files-1.16.2.jar:1.16.2] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_342] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [?:1.8.0_342] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_342] > at > java.util.concurrent.ThreadPofileolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_342] > at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342] > Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException: Index: > 1, Size: 1 > at > org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:40) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) > ~[flink-connector-files-1.16.2.jar:1.16.2] > ... 6 more > Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 > at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_342] > at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_342] > at > org.apache.flink.hive.shaded.parquet.schema.GroupType.getType(GroupType.java:216) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:277) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:266) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:256) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:139) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:75) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:110) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:65) > ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.lambda$fetch$0(HdpFileSourceSplitReader.java:38) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:108) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:38) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > ~[flink-connector-files-1.16.2.jar:1.16.2] > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) > ~[flink-connector-files-1.16.2.jar:1.16.2] > ... 6 more{noformat} > Here's how to reproduce: > 1. Create a parquet table parquet_array_struct in Hive > {code:sql} > create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as > parquet; {code} > 2. Insert data into the parquet_array_struct table: > {code:sql} > insert into parquet_array_struct select array(named_struct('a1', 1, 'a2', 2)) > {code} > 3. Develop a Flink SQL task to read data from parquet_array_struct tables: > {code:java} > EnvironmentSettings settings = EnvironmentSettings.inBatchMode(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > String name = "myhive"; > String defaultDatabase = "default"; > String hiveConfDir = null; > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); > tableEnv.registerCatalog("myhive", hive); > // set the HiveCatalog as the current catalog of the session > tableEnv.useCatalog("myhive"); > tableEnv.useDatabase(defaultDatabase); > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnv.executeSql("select * from parquet_array_struct"); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)