Hey Xudong,

We had been digging this issue for a while, and believe PR 5339 <http://github.com/apache/spark/pull/5339> and PR 5334 <http://github.com/apache/spark/pull/5339> should fix this issue.

There two problems:

1. Normally we cache Parquet table metadata for better performance, but when converting Hive metastore Hive tables, the cache is not used. Thus heavy operations like schema discovery is done every time a metastore Parquet table is converted. 2. With Parquet task side metadata reading (which is turned on by default), we can actually skip the row group information in the footer. However, we accidentally called a Parquet function which doesn't skip row group information.

For your question about schema merging, Parquet allows different part-files have different but compatible schemas. For example, part-00001.parquet has columns a and b, while part-00002.parquet may has columns a and c. In some cases, the summary files (_metadata and _common_metadata) contains the merged schema (a, b, and c), but it's not guaranteed. For example, when the user defined metadata stored different part-files contain different values for the same key, Parquet simply gives up writing summary files. That's why all part-files must be touched to get a precise merged schema.

However, in scenarios where a centralized arbitrative schema is available (e.g. Hive metastore schema, or the schema provided by user via data source DDL), we don't need to do schema merging on driver side, but defer it to executor side and each task only needs to reconcile those part-files it needs to touch. This is also what the Parquet developers did recently for parquet-hadoop <https://github.com/apache/incubator-parquet-mr/pull/45>.

Cheng

On 3/31/15 11:49 PM, Zheng, Xudong wrote:
Thanks Cheng!

Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but the PR 5231 seems not. Not sure any other things I did wrong ...

BTW, actually, we are very interested in the schema merging feature in Spark 1.3, so both these two solution will disable this feature, right? It seems that Parquet metadata is store in a file named _metadata in the Parquet file folder (each folder is a partition as we use partition table), why we need scan all Parquet part files? Is there any other solutions could keep schema merging feature at the same time? We are really like this feature :)

On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:

    Hi Xudong,

    This is probably because of Parquet schema merging is turned on by
    default. This is generally useful for Parquet files with different
    but compatible schemas. But it needs to read metadata from all
    Parquet part-files. This can be problematic when reading Parquet
    files with lots of part-files, especially when the user doesn't
    need schema merging.

    This issue is tracked by SPARK-6575, and here is a PR for it:
    https://github.com/apache/spark/pull/5231. This PR adds a
    configuration to disable schema merging by default when doing Hive
    metastore Parquet table conversion.

    Another workaround is to fallback to the old Parquet code by
    setting spark.sql.parquet.useDataSourceApi to false.

    Cheng


    On 3/31/15 2:47 PM, Zheng, Xudong wrote:
    Hi all,

    We are using Parquet Hive table, and we are upgrading to Spark
    1.3. But we find that, just a simple COUNT(*) query will much
    slower (100x) than Spark 1.2.

    I find the most time spent on driver to get HDFS blocks. I find
    large amount of get below logs printed:

    15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 
2097ms
    15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
       fileLength=77153436
       underConstruction=false
       
blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; 
getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010  
<http://10.152.116.172:50010>,10.152.116.169:50010  
<http://10.152.116.169:50010>, 10.153.125.184:50010]}]
       lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948  
<tel:1075187948>_1448275; getBlockSize()=77153436; corrupt=false; offset=0; 
locs=[10.152.116.169:50010  <http://10.152.116.169:50010>,10.153.125.184:50010  
<http://10.153.125.184:50010>,10.152.116.172:50010  <http://10.152.116.172:50010>]}
       isLastBlockComplete=true}
    15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode10.152.116.172:50010  
<http://10.152.116.172:50010>

    I compare the printed log with Spark 1.2, although the number of
    getBlockLocations call is similar, but each such operation only
    cost 20~30 ms (but it is 2000ms~3000ms now), and it didn't print
    the detailed LocatedBlocks info.

    Another finding is, if I read the Parquet file via scala code
    form spark-shell as below, it looks fine, the computation will
    return the result quick as before.

    |sqlContext.parquetFile("data/myparquettable")|

    Any idea about it? Thank you!


-- 郑旭东
    Zheng, Xudong





--
郑旭东
Zheng, Xudong


Reply via email to