[
https://issues.apache.org/jira/browse/FLINK-17642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17229840#comment-17229840
]
Nikola commented on FLINK-17642:
--------------------------------
[~lzljs3620320] I am not sure about any other framework.
The issue we are facing is not that there is an exception. Of course if the
file is broken it should throw an exception. The issue is that the exception
cannot be caught in flink and the whole pipeline crashes / fails because of that
> Exception while reading broken ORC file is hidden
> -------------------------------------------------
>
> Key: FLINK-17642
> URL: https://issues.apache.org/jira/browse/FLINK-17642
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / ORC
> Affects Versions: 1.8.3, 1.9.3, 1.10.1
> Reporter: Nikola
> Priority: Major
> Fix For: 1.12.0, 1.10.3, 1.11.3
>
>
> I have a simple setup of a batch job like this:
> {code:java}
> BatchTableEnvironment tableEnvFirst = BatchTableEnvironment.create(env);
> OrcTableSource orcTableSource = OrcTableSource.builder()
> .path("path", true)
> .forOrcSchema(ORC.getSchema())
> .withConfiguration(hdfsConfig)
> .build();
> tableEnvFirst.registerTableSource("table", orcTableSource);
> Table nnfTable = tableEnvFirst.sqlQuery(sqlString);
> return tableEnvFirst.toDataSet(nnfTable, Row.class);{code}
>
> And that works just fine to fetch ORC files from hdfs as a DataSet.
> However, there are some ORC files which are broken. "Broken" means that they
> are invalid in some way and cannot be processed / fetch normally. They throw
> exceptions. Examples of those are:
> {code:java}
> org.apache.orc.FileFormatException: Malformed ORC file /user/hdfs/orcfile-1
> Invalid postscript length 2
> at org.apache.orc.impl.ReaderImpl.ensureOrcFooter(ReaderImpl.java:258)
> at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:562)
> at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:370)
> at org.apache.orc.OrcFile.createReader(OrcFile.java:342)
> at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225)
> at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748){code}
>
> {code:java}
> com.google.protobuf.InvalidProtocolBufferException: Protocol message
> contained an invalid tag (zero).
> at
> com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89)
>
> at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108)
> at org.apache.orc.OrcProto$PostScript.<init>(OrcProto.java:18526)
> at org.apache.orc.OrcProto$PostScript.<init>(OrcProto.java:18490)
> at org.apache.orc.OrcProto$PostScript$1.parsePartialFrom(OrcProto.java:18628)
> at org.apache.orc.OrcProto$PostScript$1.parsePartialFrom(OrcProto.java:18623)
> at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:89)
> at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:95)
> at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
> at org.apache.orc.OrcProto$PostScript.parseFrom(OrcProto.java:19022)
> at org.apache.orc.impl.ReaderImpl.extractPostScript(ReaderImpl.java:436)
> at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:564)
> at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:370)
> at org.apache.orc.OrcFile.createReader(OrcFile.java:342)
> at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225)
> at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748){code}
>
> Given that some specific files are broken, that's OK to throw exception.
> However, the issue is that I cannot catch those exceptions and they make my
> job to fail. I tried to wrap everything in a try-catch block just to see what
> I can catch and handle, but it seems that when flink runs it, it's not run
> from that place, but rather from DataSourceTask.invoke()
> I can digged a little bit to find out why don't I get an exception and I can
> see that {{OrcTableSource}} creates {{OrcRowInputFormat}} instance
> [here|#L157]] which then calls open() and open() has this signature:
> {code:java}
> public void open(FileInputSplit fileSplit) throws IOException {{code}
>
> So the open() throws the exception but I am not able to catch it.
> Is what I am doing correct or is there any other way to handle exception
> coming from DataSourceTask.invoke()? In general my goal would be to ignore
> all broken/corrupted ORC files but that does not seem to be possible
--
This message was sent by Atlassian Jira
(v8.3.4#803005)