[ 
https://issues.apache.org/jira/browse/FLINK-19817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221185#comment-17221185
 ] 

CloseRiver commented on FLINK-19817:
------------------------------------

hi [~lzljs3620320],this is my sql

 
{code:java}
CREATE TABLE hdfs_source (    log_timestamp BIGINT,    ip STRING,    field 
STRING,    dt STRING,    `hour` STRING) PARTITIONED BY (dt, `hour`) WITH (  
'connector'='filesystem',  'path' = 'hdfs://xxx',  'format'='parquet');
CREATE TABLE print_sink (    log_timestamp BIGINT,    ip STRING,    field 
STRING,    dt STRING,    `hour` STRING)WITH (  'connector'='print' );
insert into print_sink select * from hdfs_source;
{code}
there are no Complex types.

When i dig into the code accroading to above exception,I found that the type 
of`schema` in ParquetColumnarRowSplitReader is `MessageType` which is sub-class 
of GroupType.

When invoke checkSchema() in `init`,the `t.isPrimitive()` is always false which 
caused the exception.

 

> Is FileSystem source unsupported?
> ---------------------------------
>
>                 Key: FLINK-19817
>                 URL: https://issues.apache.org/jira/browse/FLINK-19817
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: CloseRiver
>            Priority: Minor
>
> When I create a table with filesystem connector + parquet format.Then read 
> from the table and received the following exception.
>  
> {code:java}
> java.lang.UnsupportedOperationException: Complex types not supported.    at 
> org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader.checkSchema(ParquetColumnarRowSplitReader.java:226)
>     at 
> org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:144)
>     at 
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:131)
>     at 
> org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory$ParquetInputFormat.open(ParquetFileSystemFormatFactory.java:204)
>     at 
> org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory$ParquetInputFormat.open(ParquetFileSystemFormatFactory.java:159)
>     at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> {code}
> Then I found the description in official document
>  
> `File system sources for streaming is still under development. In the future, 
> the community will add support for common streaming use cases, i.e., 
> partition and directory monitoring.`
> means that read from flilesystem is not supported currently.
> But the above exception make users confused.If there is a msg about reading 
> from filesystem is unsupported will be friendly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to