Artem Shnayder created PARQUET-1919:
---------------------------------------
Summary: Buffer int overflow in CapacityByteArrayOutputStream,
SnappyCompressor
Key: PARQUET-1919
URL: https://issues.apache.org/jira/browse/PARQUET-1919
Project: Parquet
Issue Type: Bug
Components: parquet-mr
Affects Versions: 1.10.1
Reporter: Artem Shnayder
During an attempted write operation, a buffer position integer overflow is
resulting in a IllegalArgumentException: Negative capacity: -2147336621
exception.
{noformat}
20/10/06 15:30:39 INFO HadoopRDD: Input split:
s3a://<prefix>/part-00015-96362e5d-d047-4f31-812b-38ff79f6919c-c000.txt.bz2:268435456+33554432
20/10/06 17:23:37 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Negative capacity: -2147336621
at java.nio.Buffer.<init>(Buffer.java:199)
at java.nio.ByteBuffer.<init>(ByteBuffer.java:281)
at java.nio.ByteBuffer.<init>(ByteBuffer.java:289)
at java.nio.MappedByteBuffer.<init>(MappedByteBuffer.java:89)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:119)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
at
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
at
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
at
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
at
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
at
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
at
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
at
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
at
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
at
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
at
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
at
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
at
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
at
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
2,147,483,647 (max int) - 2147336621 (negative capacity) = 147026.
The input bz2 files are all roughly 900 MiB in size. The target parquet part
files are 1.7 GiB in size.
Increasing the partition count from 64 to 1024 fixes the issue. The output
parquet part files drop to 100MiB in size.
However, it's unclear to me what the root cause is and why increasing partition
count helps. Was it an unlucky row grouping that bumped the buffer size over by
147KB, i.e, any change up or down in parittion count would have helped? Is it
approaching the parquet part file size limit?
This issue seems related to PARQUET-1632 but it's not using the
ConcatenatingByteArrayCollector, which potentially means a distinct root cause.
The input dataset does have large string columns (up to 10MB) but nothing close
to the signed int max of 2.4G that was produced in PARQUET-1632.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)