[
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ZiyueGuan reassigned HUDI-2875:
-------------------------------
Assignee: ZiyueGuan
> Concurrent call to HoodieMergeHandler cause parquet corruption
> --------------------------------------------------------------
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
> Issue Type: Bug
> Components: Common Core
> Reporter: ZiyueGuan
> Assignee: ZiyueGuan
> Priority: Major
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when
> read.
> e.g.
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
> at 0 in block -1 in file <FilePath>
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
> at
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
> at
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in
> col required binary col
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
> at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
> at
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
> at
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
> at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
> at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
> ... 11 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:197)
> at java.io.DataInputStream.readFully(DataInputStream.java:169)
> at
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
> at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
> at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say,
> speculation. When speculation is triggered, other tasks at the same time or
> later will have the risk to suffer a wrong parquet generation. Nearly half
> of them will throw exception while there is few tasks succeed without any
> signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to
> guarantee that there is not concurrent call to ParquetWriter. In the
> following code:
> https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103
> We call both write and close to parquet writer concurrently. There is a pool
> of Compressor which is used inside parquet writer for store compressed bytes.
> Parquet writers closed in such way, could not payback totally reset
> compressor so that any task reuse this dirty compressor may generate wrong
> data.
>
> What a pity is that I haven't come up with a good way to repo in small use
> case. Validation is doing in real hudi ingestion job.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)