[
https://issues.apache.org/jira/browse/PARQUET-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17165601#comment-17165601
]
ASF GitHub Bot commented on PARQUET-1531:
-----------------------------------------
gszadovszky commented on a change in pull request #620:
URL: https://github.com/apache/parquet-mr/pull/620#discussion_r460799987
##########
File path:
parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
##########
@@ -305,6 +305,9 @@ long getRowsWrittenSoFar() {
* Writes the current data to a new page in the page store
*/
void writePage() {
+ if (valueCount == 0) {
+ throw new ParquetEncodingException("writing empty page");
Review comment:
You may check my answer at the referenced StackOverFlow thread.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Page row count limit causes empty pages to be written from MessageColumnIO
> --------------------------------------------------------------------------
>
> Key: PARQUET-1531
> URL: https://issues.apache.org/jira/browse/PARQUET-1531
> Project: Parquet
> Issue Type: Bug
> Affects Versions: 1.11.0
> Reporter: Matt Cheah
> Assignee: Gabor Szadovszky
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.11.0
>
>
> This originally manifested as
> https://issues.apache.org/jira/browse/SPARK-26874 but we realized that this
> is fundamentally an issue in the way PARQUET-1414's solution interacts with
> {{MessageColumnIO}}, where Spark is one such user of that API.
> In {{MessageColumnIO#endMessage()}}, we first examine if any fields are
> missing and fill in the values with null in
> {{MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel}}. However, this
> method might not actually write any nulls to the underlying page.
> {{MessageColumnIO}} can buffer nulls in memory and flush them to the page
> store lazily.
> Regardless of whether or not nulls are flushed to the page store, in
> {{MessageColumnIO#endMessage}} we always call {{columns#endRecord()}} which
> will signal to the {{ColumnWriteStore}} that a record was written. At that
> point, the write store increments the row count for the current page by 1,
> and then check if the page needs to be flushed due to hitting the page row
> count limit.
> The problem is that with the above writing scheme, {{MessageColumnIO}} can
> cause empty pages to be written to Parquet files, and empty pages are not
> readable by Parquet readers. Suppose the page row count limit is N, and the
> {{MessageColumnIO}} receives N nulls for a column. The {{MessageColumnIO}}
> will buffer the nulls in memory, and doesn't necessarily flush the nulls to
> the writer yet. On the Nth call to {{endMessage()}}, however, the column
> store will think there are N values in memory and that the page has hit the
> row count limit, despite the fact that no rows have actually been written at
> all. But the underlying page writer will write an empty page regardless.
> To illustrate the problem, one can try running this simple example inserted
> into Spark's \{{ParquetIOSuite}} when Spark has been upgraded to use the
> master branch of Parquet. Attach a debugger to
> {{MessageColumnIO#endMessage()}} and trace the logic accordingly - the column
> writer will push a page with 0 values:
> {code:java}
> test("PARQUET-1414 Problems") {
> // Manually adjust the maximum row count to reproduce the issue on small
> data
> sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
> withTempPath { location =>
> val path = new Path(location.getCanonicalPath + "/parquet-data")
> val schema = StructType(
> Array(StructField("timestamps1", ArrayType(TimestampType))))
> val rows = ListBuffer[Row]()
> for (j <- 0 until 10) {
> rows += Row(
> null.asInstanceOf[Array[java.sql.Timestamp]])
> }
> val srcDf = spark.createDataFrame(
> sparkContext.parallelize(rows, 3),
> schema,
> true)
> srcDf.write.parquet(path.toString)
> assert(spark.read.parquet(path.toString).collect.size > 0)
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)