Artem Shnayder created PARQUET-1919:
---------------------------------------

             Summary: Buffer int overflow in CapacityByteArrayOutputStream, 
SnappyCompressor
                 Key: PARQUET-1919
                 URL: https://issues.apache.org/jira/browse/PARQUET-1919
             Project: Parquet
          Issue Type: Bug
          Components: parquet-mr
    Affects Versions: 1.10.1
            Reporter: Artem Shnayder


During an attempted write operation, a buffer position integer overflow is 
resulting in a IllegalArgumentException: Negative capacity: -2147336621 
exception.

 
{noformat}
20/10/06 15:30:39 INFO HadoopRDD: Input split: 
s3a://<prefix>/part-00015-96362e5d-d047-4f31-812b-38ff79f6919c-c000.txt.bz2:268435456+33554432
20/10/06 17:23:37 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Negative capacity: -2147336621
        at java.nio.Buffer.<init>(Buffer.java:199)
        at java.nio.ByteBuffer.<init>(ByteBuffer.java:281)
        at java.nio.ByteBuffer.<init>(ByteBuffer.java:289)
        at java.nio.MappedByteBuffer.<init>(MappedByteBuffer.java:89)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:119)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
        at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
        at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){noformat}
2,147,483,647 (max int) - 2147336621 (negative capacity) = 147026.

The input bz2 files are all roughly 900 MiB in size. The target parquet part 
files are 1.7 GiB in size.

Increasing the partition count from 64 to 1024 fixes the issue. The output 
parquet part files drop to 100MiB in size. 

However, it's unclear to me what the root cause is and why increasing partition 
count helps. Was it an unlucky row grouping that bumped the buffer size over by 
147KB, i.e, any change up or down in parittion count would have helped? Is it 
approaching the parquet part file size limit?

This issue seems related to PARQUET-1632 but it's not using the 
ConcatenatingByteArrayCollector, which potentially means a distinct root cause. 
The input dataset does have large string columns (up to 10MB) but nothing close 
to the signed int max of 2.4G that was produced in PARQUET-1632.

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to