[ 
https://issues.apache.org/jira/browse/FLINK-35523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869578#comment-17869578
 ] 

Jichao Wang edited comment on FLINK-35523 at 9/12/24 3:21 AM:
--------------------------------------------------------------

In response to this problem, I have done the following steps:
 # Confirm the scope of the issue: The vectorization read method of the Flink 
Hive connector supports complex nested fields in ORC tables and Parquet tables
{code:sql}
create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as 
parquet;
create table orc_array_struct(a array<struct<a1:int, a2:int>>) stored as 
orcfile;
{code}
The flink connector is able to read orc_array_struct tables normally, but the 
above exception is thrown when reading the parquet_array_struct, so the problem 
is only for parquet tables;

 # Validating the _table.exec.hive.fallback-mapred-reader=true_ parameter can 
circumvent this issue;
 # Try to fix the null pointer exception: Change line 503 of the 
ParquetSplitReaderUtil class to the following code:
{code:java}
createWritableColumnVector(
        batchSize,
        arrayType.getElementType(),
        type.asGroupType().getType(0).asGroupType().getType(0),
        descriptors,
        depth + 2));
{code}
Although the null pointer exception was fixed, the following exception was 
thrown:
{code:java}
Unable to find source-code formatter for language: text. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlCaused by: java.lang.RuntimeException: Unsupported type in the list: 
ROW<`a1` INT>
at 
org.apache.flink.formats.parquet.vector.reader.ArrayColumnReader.readPrimitiveTypedRow(ArrayColumnReader.java:175)
{code}
Finally, combined with the source code confirmation, the Hive connector does 
not support vectorized reading of complex nested fields of parquet tables, such 
as array<struct<a1:int, a2:int>>.

Combined with the above analysis, I think we can identify this complex nested 
type in advance in the 
org.apache.flink.connectors.hive.read.HiveInputFormat#isVectorizationUnsupported
 method, and read the hive table in a mapred way.


was (Author: wjc920):
In response to this problem, I have done the following steps:
 # Confirm the scope of the issue: The vectorization read method of the Flink 
Hive connector supports complex nested fields in ORC tables and Parquet tables
{code:sql}
create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as 
parquet;
create table orc_array_struct(a array<struct<a1:int, a2:int>>) stored as 
orcfile;
{code}
The flink connector is able to read orc_array_struct tables normally, but the 
above exception is thrown when reading the parquet_array_struct, so the problem 
is only for parquet tables;
 # Validating the _table.exec.hive.fallback-mapred-reader=true_ parameter can 
circumvent this issue;
 # Try to fix the null pointer exception: Change line 503 of the 
ParquetSplitReaderUtil class to the following code:
{code:java}
createWritableColumnVector(
        batchSize,
        arrayType.getElementType(),
        type.asGroupType().getType(0).asGroupType().getType(0),
        descriptors,
        depth + 2));
{code}
Although the null pointer exception was fixed, the following exception was 
thrown:
{code}
Unable to find source-code formatter for language: text. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlCaused by: java.lang.RuntimeException: Unsupported type in the list: 
ROW<`a1` INT>
at 
org.apache.flink.formats.parquet.vector.reader.ArrayColumnReader.readPrimitiveTypedRow(ArrayColumnReader.java:175)
{code}
Finally, combined with the source code confirmation, the Hive connector does 
not support vectorized reading of complex nested fields of parquet tables, such 
as array<struct<a1:int, a2:int>>.

Combined with the above analysis, I think we can identify this complex nested 
type in advance in the 
org.apache.flink.connectors.hive.read.HiveInputFormat#isVectorizationUnsupported
 method, and read the hive table in a mapred way.

For more information about the fix, see Issue Links(I'll give my patch at a 
later date.).

> When using the Hive connector to read a Hive table in Parquet format, a null 
> pointer exception is thrown.
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35523
>                 URL: https://issues.apache.org/jira/browse/FLINK-35523
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.16.2, 1.19.0
>            Reporter: Jichao Wang
>            Priority: Major
>             Fix For: 2.0-preview
>
>
> When using the Hive connector to read a Hive table in Parquet format, a null 
> pointer exception is thrown.
> The exception stack information is as follows:
> {noformat}
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected 
> exception while polling the records
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  [flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_342]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [?:1.8.0_342]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_342]
>       at 
> java.util.concurrent.ThreadPofileolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_342]
>       at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
> Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 
> 1, Size: 1
>       at 
> org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:40)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
>       at java.util.ArrayList.rangeCheck(ArrayList.java:659) ~[?:1.8.0_342]
>       at java.util.ArrayList.get(ArrayList.java:435) ~[?:1.8.0_342]
>       at 
> org.apache.flink.hive.shaded.parquet.schema.GroupType.getType(GroupType.java:216)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.hive.shaded.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:277)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:266)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:256)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:139)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:75)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:110)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connectors.hive.read.HiveInputFormat.createReader(HiveInputFormat.java:65)
>  ~[flink-connector-hive_2.12-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.lambda$fetch$0(HdpFileSourceSplitReader.java:38)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.runtime.security.SecurityUtils.runAtOneContext(SecurityUtils.java:108)
>  ~[flink-dist-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.file.src.impl.HdpFileSourceSplitReader.fetch(HdpFileSourceSplitReader.java:38)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>  ~[flink-connector-files-1.16.2.jar:1.16.2]
>       ... 6 more{noformat}
> Here's how to reproduce:
> 1. Create a parquet table parquet_array_struct in Hive
> {code:sql}
> create table parquet_array_struct(a array<struct<a1:int, a2:int>>) stored as 
> parquet; {code}
> 2. Insert data into the parquet_array_struct table:
> {code:sql}
> insert into parquet_array_struct select array(named_struct('a1', 1, 'a2', 2)) 
> {code}
> 3. Develop a Flink SQL task to read data from parquet_array_struct tables:
> {code:java}
> EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> String name            = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir     = null;
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
> tableEnv.registerCatalog("myhive", hive);
> // set the HiveCatalog as the current catalog of the session
> tableEnv.useCatalog("myhive");
> tableEnv.useDatabase(defaultDatabase);
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql("select * from parquet_array_struct");
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to