[
https://issues.apache.org/jira/browse/PARQUET-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated PARQUET-1531:
------------------------------------
Labels: pull-request-available (was: )
> Page row count limit causes empty pages to be written from MessageColumnIO
> --------------------------------------------------------------------------
>
> Key: PARQUET-1531
> URL: https://issues.apache.org/jira/browse/PARQUET-1531
> Project: Parquet
> Issue Type: Bug
> Reporter: Matt Cheah
> Assignee: Gabor Szadovszky
> Priority: Major
> Labels: pull-request-available
>
> This originally manifested as
> https://issues.apache.org/jira/browse/SPARK-26874 but we realized that this
> is fundamentally an issue in the way PARQUET-1414's solution interacts with
> {{MessageColumnIO}}, where Spark is one such user of that API.
> In {{MessageColumnIO#endMessage()}}, we first examine if any fields are
> missing and fill in the values with null in
> {{MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel}}. However, this
> method might not actually write any nulls to the underlying page.
> {{MessageColumnIO}} can buffer nulls in memory and flush them to the page
> store lazily.
> Regardless of whether or not nulls are flushed to the page store, in
> {{MessageColumnIO#endMessage}} we always call {{columns#endRecord()}} which
> will signal to the {{ColumnWriteStore}} that a record was written. At that
> point, the write store increments the row count for the current page by 1,
> and then check if the page needs to be flushed due to hitting the page row
> count limit.
> The problem is that with the above writing scheme, {{MessageColumnIO}} can
> cause empty pages to be written to Parquet files, and empty pages are not
> readable by Parquet readers. Suppose the page row count limit is N, and the
> {{MessageColumnIO}} receives N nulls for a column. The {{MessageColumnIO}}
> will buffer the nulls in memory, and doesn't necessarily flush the nulls to
> the writer yet. On the Nth call to {{endMessage()}}, however, the column
> store will think there are N values in memory and that the page has hit the
> row count limit, despite the fact that no rows have actually been written at
> all. But the underlying page writer will write an empty page regardless.
> To illustrate the problem, one can try running this simple example inserted
> into Spark's \{{ParquetIOSuite}} when Spark has been upgraded to use the
> master branch of Parquet. Attach a debugger to
> {{MessageColumnIO#endMessage()}} and trace the logic accordingly - the column
> writer will push a page with 0 values:
> {code:java}
> test("PARQUET-1414 Problems") {
> // Manually adjust the maximum row count to reproduce the issue on small
> data
> sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
> withTempPath { location =>
> val path = new Path(location.getCanonicalPath + "/parquet-data")
> val schema = StructType(
> Array(StructField("timestamps1", ArrayType(TimestampType))))
> val rows = ListBuffer[Row]()
> for (j <- 0 until 10) {
> rows += Row(
> null.asInstanceOf[Array[java.sql.Timestamp]])
> }
> val srcDf = spark.createDataFrame(
> sparkContext.parallelize(rows, 3),
> schema,
> true)
> srcDf.write.parquet(path.toString)
> assert(spark.read.parquet(path.toString).collect.size > 0)
> }
> }{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)