In parquet-mr 1.8.1, constructing empty GroupType (and thus MessageType)
is not allowed anymore (see PARQUET-278
<https://issues.apache.org/jira/browse/PARQUET-278>). This change makes
sense in most cases since Parquet doesn't support empty groups. However,
there is one use case where an empty MessageType is valid, namely
passing an empty MessageType as requestedSchema as constructor argument
of ReadContext when counting rows in a Parquet file. The reason why it
works is that, Parquet can retrieve row count from block metadata
without materializing any columns. Take the following PySpark shell
snippet (1.5-SNAPSHOT
<https://github.com/apache/spark/commit/010b03ed52f35fd4d426d522f8a9927ddc579209>,
which uses parquet-mr 1.7.0) as an example:
>>> path = 'file:///tmp/foo'
>>> # Writes 10 integers into a Parquet file
>>>
sqlContext.range(10).coalesce(1).write.mode('overwrite').parquet(path)
>>> sqlContext.read.parquet(path).count()
10
Parquet related log lines:
15/08/21 12:32:04 INFO CatalystReadSupport: Going to read the
following fields from the Parquet file:
Parquet form:
message root {
}
Catalyst form:
StructType()
15/08/21 12:32:04 INFO InternalParquetRecordReader: RecordReader
initialized will read a total of 10 records.
15/08/21 12:32:04 INFO InternalParquetRecordReader: at row 0.
reading next block
15/08/21 12:32:04 INFO InternalParquetRecordReader: block read in
memory in 0 ms. row count = 10
We can see that Spark SQL passes no requested columns to the underlying
Parquet reader. What happens here is that:
1. Spark SQL creates a CatalystRowConverter with zero converters (and
thus only generates empty Rows).
2. InternalParquetRecordReader first obtain the row count from block
metadata (here
<https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.1/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java#L184-L186>).
3. MessageColumnIO returns an EmptyRecordRecorder for reading the
Parquet file (here
<https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.1/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java#L97-L99>).
4. InternalParquetRecordReader.nextKeyValue() is invoked n times, where
n equals to the row count. Each time, it invokes the converter
created by Spark SQL and produces an empty Spark SQL row object
When upgrading to Parquet 1.8.1, Hive worked around this issue by using
tableSchema as requestedSchema when no columns are requested (here
<https://github.com/apache/hive/commit/3e68cdc9962cacab59ee891fcca6a736ad10d37d#diff-cc764a8828c4acc2a27ba717610c3f0bR233>).
IMO this introduces a performance regression in cases like counting,
because now we need to materialize all columns just for counting.
I don't have a strong opinion about how to fix this issue for now. Maybe
we can provide a new ReadContext constructor without the requestedSchema
argument, which indicates no columns is requested at all.
Cheng