[ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16903082#comment-16903082
 ] 

Ivan Sadikov edited comment on PARQUET-1632 at 8/8/19 3:43 PM:
---------------------------------------------------------------

I don't think your assessment is correct. Yes, it overflows when we cast size() 
to integer, even though the size can be long. 

It looks like the problem is ConcatenatingByteArrayCollector and toByteArray 
method. ConcatenatingByteArrayCollector class should handle arbitrary 
BytesInput by checking its size first and splitting the data across slabs (in 
this particular case even when the total size is larger than i32 max value) - 
I'd say that is the whole point of concatenating byte array in the first place.

Note that BytesInput instance itself did not fail with overflow. It only failed 
when turning it to a byte array. If what you mentioned in the comment was true, 
I would expect failure when creating an instance of BytesInput.

Collector should be filling up the last slab and allocating those as it 
requests more and more data from BytesInput, similar to output stream. 
BytesInput is treated like stream based on comments in the code, so it is a 
bug, IMHO.

 

Regarding your comment around page size check - it is merely a mitigation 
strategy that makes it pass the error. It is not a fix. I already included it 
in PARQUET-1633.

My understanding is parquet.page.size.row.check.min affects the page size 
itself. If page size was larger than 2GB (which, I would expect the code to 
fail here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116]

Or here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123]

Those conditions explicitly check for the page size. But the code did not fail 
there.


was (Author: sadikovi):
I don't think your assessment is correct. Yes, it overflows when we cast size() 
to integer, even though the size can be long. 

It looks like the problem is ConcatenatingByteArrayCollector and toByteArray 
method. ConcatenatingByteArrayCollector class should handle arbitrary 
BytesInput by checking its size first and splitting the data across slabs (in 
this particular case even when the total size is larger than i32 max value) - 
I'd say that is the whole point of concatenating byte array in the first place.

Note that BytesInput instance itself did not fail with overflow. It only failed 
when turning it to a byte array. If what you mentioned in the comment was true, 
I would expect failure when creating an instance of BytesInput.

Collector should be filling up the last slab and allocating those as it 
requests more and more data from BytesInput, similar to output stream.

 

Regarding your comment around page size check - it is merely a mitigation 
strategy that makes it pass the error. It is not a fix. I already included it 
in PARQUET-1633.

My understanding is parquet.page.size.row.check.min affects the page size 
itself. If page size was larger than 2GB (which, I would expect the code to 
fail here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116]

Or here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123]

Those conditions explicitly check for the page size. But the code did not fail 
there.

> Negative initial size when writing large values in parquet-mr
> -------------------------------------------------------------
>
>                 Key: PARQUET-1632
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1632
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.10.1
>            Reporter: Ivan Sadikov
>            Assignee: Junjie Chen
>            Priority: Major
>
> I encountered an issue when writing large string values to Parquet.
> Here is the code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.functions._
> def longString: String = "a" * (64 * 1024 * 1024)
> val long_string = udf(() => longString)
> val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())
> spark.conf.set("parquet.enable.dictionary", "false")
> df.write.option("compression", 
> "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
>  
> This Spark job fails with the exception:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
> org.apache.spark.SparkException: Task failed while writing rows. at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.IllegalArgumentException: Negative initial size: 
> -1610612543 at 
> java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:74) at 
> org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:234) at 
> org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:232) at 
> org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
>  at 
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) 
> at 
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>  at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>  ... 11 more{code}
>  
> Would appreciate if you could help addressing the problem. Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to