ZiyueGuan created HUDI-2875:
-------------------------------

             Summary: Concurrent call to HoodieMergeHandler cause parquet 
corruption
                 Key: HUDI-2875
                 URL: https://issues.apache.org/jira/browse/HUDI-2875
             Project: Apache Hudi
          Issue Type: Bug
          Components: Common Core
            Reporter: ZiyueGuan


Problem:

Some corrupted parquet files are generated and exceptions will be thrown when 
read.

e.g.

 
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file <FilePath>
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page 
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col  
required binary col
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at 
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
 

How to reproduce:

We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
speculation. When speculation is triggered, other tasks at the same time or 
later will have the risk to suffer a wrong parquet generation.  Nearly half of 
them will throw exception while there is few tasks succeed without any signal.

RootCause:

ParquetWriter is not thread safe. User of it should apply proper way to 
guarantee that there is not concurrent call to ParquetWriter. In the following 
code: 

https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103

We call both write and close to parquet writer concurrently. There is a pool of 
Compressor which is used inside parquet writer for store compressed bytes. 
Parquet writers closed in such way, could not payback totally reset compressor 
so that any task reuse this dirty compressor may generate wrong data.

 

What a pity is that I haven't come up with a good way to repo in small use 
case. Validation is doing in real hudi ingestion job.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to