[
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ZiyueGuan updated HUDI-2875:
----------------------------
Description:
Problem:
Some corrupted parquet files are generated and exceptions will be thrown when
read.
e.g.
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
at 0 in block -1 in file <FilePath>
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col
required binary col
at
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
at
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
at
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
at
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
at
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
at
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
at
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
at
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
at
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
at
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
... 11 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
at
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
How to reproduce:
We need a way which could interrupt one task w/o shutdown JVM. Let's say,
speculation. When speculation is triggered, other tasks working at the same
executor will have the risk to suffer a wrong parquet generation. This will not
always result in corrupted parquet file. Nearly half of them will throw
exception while there is few tasks succeed without any signal.
RootCause:
ParquetWriter is not thread safe. User of it should apply proper way to
guarantee that there is not concurrent call to ParquetWriter.
In the following code:
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
We call both write and close to parquet writer concurrently. Data may being
written while we call close. In close method, compressor (a class used by
parquet to do compressing which has a stateful data structure insied) will be
cleared and payback to a pool for following reuse. Due to the concurrent write
mentioned above, data may be continued pushed to compressor even though we have
them cleared. Besides, there is a mechanism inside compressor which tries to
check some invalid use. That's why some of invalid usage will throw exception
rather than generate corrupted parquet.
Validation:
Current solution is validated by production environment. A single is that when
this fix applied is that there should be no task failed due to some error like
"BlockCompressorStream: write beyond end of stream". The reason is that
BlockCompressorStream checking mechanism will not be triggered by concurrent
write.
was:
Problem:
Some corrupted parquet files are generated and exceptions will be thrown when
read.
e.g.
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
at 0 in block -1 in file <FilePath>
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col
required binary col
at
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
at
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
at
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
at
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
at
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
at
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
at
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
at
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
at
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
at
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
... 11 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
at
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
How to reproduce:
We need a way which could interrupt one task w/o shutdown JVM. Let's say,
speculation. When speculation is triggered, other tasks at the same time or
later will have the risk to suffer a wrong parquet generation. Nearly half of
them will throw exception while there is few tasks succeed without any signal.
RootCause:
ParquetWriter is not thread safe. User of it should apply proper way to
guarantee that there is not concurrent call to ParquetWriter. In the following
code:
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103
We call both write and close to parquet writer concurrently. There is a pool of
Compressor which is used inside parquet writer for store compressed bytes.
Parquet writers closed in such way, could not payback totally reset compressor
so that any task reuse this dirty compressor may generate wrong data.
What a pity is that I haven't come up with a good way to repo in small use
case. Validation is doing in real hudi ingestion job.
> Concurrent call to HoodieMergeHandler cause parquet corruption
> --------------------------------------------------------------
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
> Issue Type: Bug
> Components: Common Core
> Reporter: ZiyueGuan
> Assignee: ZiyueGuan
> Priority: Major
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when
> read.
> e.g.
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
> at 0 in block -1 in file <FilePath>
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
> at
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
> at
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in
> col required binary col
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
> at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
> at
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
> at
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
> at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
> at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
> ... 11 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:197)
> at java.io.DataInputStream.readFully(DataInputStream.java:169)
> at
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
> at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
> at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say,
> speculation. When speculation is triggered, other tasks working at the same
> executor will have the risk to suffer a wrong parquet generation. This will
> not always result in corrupted parquet file. Nearly half of them will throw
> exception while there is few tasks succeed without any signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to
> guarantee that there is not concurrent call to ParquetWriter.
> In the following code:
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]
> We call both write and close to parquet writer concurrently. Data may being
> written while we call close. In close method, compressor (a class used by
> parquet to do compressing which has a stateful data structure insied) will be
> cleared and payback to a pool for following reuse. Due to the concurrent
> write mentioned above, data may be continued pushed to compressor even though
> we have them cleared. Besides, there is a mechanism inside compressor which
> tries to check some invalid use. That's why some of invalid usage will throw
> exception rather than generate corrupted parquet.
> Validation:
> Current solution is validated by production environment. A single is that
> when this fix applied is that there should be no task failed due to some
> error like "BlockCompressorStream: write beyond end of stream". The reason is
> that BlockCompressorStream checking mechanism will not be triggered by
> concurrent write.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)