[ https://issues.apache.org/jira/browse/PARQUET-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gabor Szadovszky updated PARQUET-1531: -------------------------------------- Affects Version/s: 1.11.0 > Page row count limit causes empty pages to be written from MessageColumnIO > -------------------------------------------------------------------------- > > Key: PARQUET-1531 > URL: https://issues.apache.org/jira/browse/PARQUET-1531 > Project: Parquet > Issue Type: Bug > Affects Versions: 1.11.0 > Reporter: Matt Cheah > Assignee: Gabor Szadovszky > Priority: Major > Labels: pull-request-available > > This originally manifested as > https://issues.apache.org/jira/browse/SPARK-26874 but we realized that this > is fundamentally an issue in the way PARQUET-1414's solution interacts with > {{MessageColumnIO}}, where Spark is one such user of that API. > In {{MessageColumnIO#endMessage()}}, we first examine if any fields are > missing and fill in the values with null in > {{MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel}}. However, this > method might not actually write any nulls to the underlying page. > {{MessageColumnIO}} can buffer nulls in memory and flush them to the page > store lazily. > Regardless of whether or not nulls are flushed to the page store, in > {{MessageColumnIO#endMessage}} we always call {{columns#endRecord()}} which > will signal to the {{ColumnWriteStore}} that a record was written. At that > point, the write store increments the row count for the current page by 1, > and then check if the page needs to be flushed due to hitting the page row > count limit. > The problem is that with the above writing scheme, {{MessageColumnIO}} can > cause empty pages to be written to Parquet files, and empty pages are not > readable by Parquet readers. Suppose the page row count limit is N, and the > {{MessageColumnIO}} receives N nulls for a column. The {{MessageColumnIO}} > will buffer the nulls in memory, and doesn't necessarily flush the nulls to > the writer yet. On the Nth call to {{endMessage()}}, however, the column > store will think there are N values in memory and that the page has hit the > row count limit, despite the fact that no rows have actually been written at > all. But the underlying page writer will write an empty page regardless. > To illustrate the problem, one can try running this simple example inserted > into Spark's \{{ParquetIOSuite}} when Spark has been upgraded to use the > master branch of Parquet. Attach a debugger to > {{MessageColumnIO#endMessage()}} and trace the logic accordingly - the column > writer will push a page with 0 values: > {code:java} > test("PARQUET-1414 Problems") { > // Manually adjust the maximum row count to reproduce the issue on small > data > sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1") > withTempPath { location => > val path = new Path(location.getCanonicalPath + "/parquet-data") > val schema = StructType( > Array(StructField("timestamps1", ArrayType(TimestampType)))) > val rows = ListBuffer[Row]() > for (j <- 0 until 10) { > rows += Row( > null.asInstanceOf[Array[java.sql.Timestamp]]) > } > val srcDf = spark.createDataFrame( > sparkContext.parallelize(rows, 3), > schema, > true) > srcDf.write.parquet(path.toString) > assert(spark.read.parquet(path.toString).collect.size > 0) > } > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)