Matt Cheah created PARQUET-1531:
-----------------------------------

             Summary: Page row count limit causes empty pages to be written 
from MessageColumnIO
                 Key: PARQUET-1531
                 URL: https://issues.apache.org/jira/browse/PARQUET-1531
             Project: Parquet
          Issue Type: Bug
            Reporter: Matt Cheah


This originally manifested as https://issues.apache.org/jira/browse/SPARK-26874 
but we realized that this is fundamentally an issue in the way PARQUET-1414's 
solution interacts with {{MessageColumnIO}}, where Spark is one such user of 
that API.

In {{MessageColumnIO#endMessage()}}, we first examine if any fields are missing 
and fill in the values with null in 
{{MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel}}. However, this 
method might not actually write any nulls to the underlying page. 
{{MessageColumnIO}} can buffer nulls in memory and flush them to the page store 
lazily.

Regardless of whether or not nulls are flushed to the page store, in 
{{MessageColumnIO#endMessage}} we always call {{columns#endRecord()}} which 
will signal to the {{ColumnWriteStore}} that a record was written. At that 
point, the write store increments the row count for the current page by 1, and 
then check if the page needs to be flushed due to hitting the page row count 
limit.

The problem is that with the above writing scheme, {{MessageColumnIO}} can 
cause empty pages to be written to Parquet files, and empty pages are not 
readable by Parquet readers. Suppose the page row count limit is N, and the 
{{MessageColumnIO}} receives N nulls for a column. The {{MessageColumnIO}} will 
buffer the nulls in memory, and doesn't necessarily flush the nulls to the 
writer yet. On the Nth call to {{endMessage()}}, however, the column store will 
think there are N values in memory and that the page has hit the row count 
limit, despite the fact that no rows have actually been written at all. But the 
underlying page writer will write an empty page regardless.

To illustrate the problem, one can try running this simple example inserted 
into Spark's \{{ParquetIOSuite}} when Spark has been upgraded to use the master 
branch of Parquet. Attach a debugger to {{MessageColumnIO#endMessage()}} and 
trace the logic accordingly - the column writer will push a page with 0 values:
{code:java}
test("PARQUET-1414 Problems") {
  // Manually adjust the maximum row count to reproduce the issue on small data
  sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
  withTempPath { location =>
    val path = new Path(location.getCanonicalPath + "/parquet-data")
    val schema = StructType(
      Array(StructField("timestamps1", ArrayType(TimestampType))))
    val rows = ListBuffer[Row]()
    for (j <- 0 until 10) {
      rows += Row(
        null.asInstanceOf[Array[java.sql.Timestamp]])
    }
    val srcDf = spark.createDataFrame(
      sparkContext.parallelize(rows, 3),
      schema,
      true)
    srcDf.write.parquet(path.toString)
    assert(spark.read.parquet(path.toString).collect.size > 0)
  }
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to