Matt Cheah created PARQUET-1531:
-----------------------------------
Summary: Page row count limit causes empty pages to be written
from MessageColumnIO
Key: PARQUET-1531
URL: https://issues.apache.org/jira/browse/PARQUET-1531
Project: Parquet
Issue Type: Bug
Reporter: Matt Cheah
This originally manifested as https://issues.apache.org/jira/browse/SPARK-26874
but we realized that this is fundamentally an issue in the way PARQUET-1414's
solution interacts with {{MessageColumnIO}}, where Spark is one such user of
that API.
In {{MessageColumnIO#endMessage()}}, we first examine if any fields are missing
and fill in the values with null in
{{MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel}}. However, this
method might not actually write any nulls to the underlying page.
{{MessageColumnIO}} can buffer nulls in memory and flush them to the page store
lazily.
Regardless of whether or not nulls are flushed to the page store, in
{{MessageColumnIO#endMessage}} we always call {{columns#endRecord()}} which
will signal to the {{ColumnWriteStore}} that a record was written. At that
point, the write store increments the row count for the current page by 1, and
then check if the page needs to be flushed due to hitting the page row count
limit.
The problem is that with the above writing scheme, {{MessageColumnIO}} can
cause empty pages to be written to Parquet files, and empty pages are not
readable by Parquet readers. Suppose the page row count limit is N, and the
{{MessageColumnIO}} receives N nulls for a column. The {{MessageColumnIO}} will
buffer the nulls in memory, and doesn't necessarily flush the nulls to the
writer yet. On the Nth call to {{endMessage()}}, however, the column store will
think there are N values in memory and that the page has hit the row count
limit, despite the fact that no rows have actually been written at all. But the
underlying page writer will write an empty page regardless.
To illustrate the problem, one can try running this simple example inserted
into Spark's \{{ParquetIOSuite}} when Spark has been upgraded to use the master
branch of Parquet. Attach a debugger to {{MessageColumnIO#endMessage()}} and
trace the logic accordingly - the column writer will push a page with 0 values:
{code:java}
test("PARQUET-1414 Problems") {
// Manually adjust the maximum row count to reproduce the issue on small data
sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
withTempPath { location =>
val path = new Path(location.getCanonicalPath + "/parquet-data")
val schema = StructType(
Array(StructField("timestamps1", ArrayType(TimestampType))))
val rows = ListBuffer[Row]()
for (j <- 0 until 10) {
rows += Row(
null.asInstanceOf[Array[java.sql.Timestamp]])
}
val srcDf = spark.createDataFrame(
sparkContext.parallelize(rows, 3),
schema,
true)
srcDf.write.parquet(path.toString)
assert(spark.read.parquet(path.toString).collect.size > 0)
}
}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)