Based on what you've described, I think you should be able to use Spark's 
parquet reader plus partition pruning in 2.1.

> On Jan 17, 2017, at 10:44 PM, Raju Bairishetti <r...@apache.org> wrote:
> 
> Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0?
> 
>   We are giving very high memory to spark-driver to avoid the OOM(heap space/ 
> GC overhead limit) errors in spark-app. But when we run two-three jobs 
> together, these are bringing down the Hive metastore. We had to forcefully 
> drop older partitions to avoid frequent downs of Hive Metastore.
> 
> 
> On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> I think I understand. Partition pruning for the case where 
> spark.sql.hive.convertMetastoreParquet is true was not added to Spark until 
> 2.1.0. I think that in previous versions it only worked when 
> spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that 
> configuration gives you data decoding errors. If it's possible for you to 
> write all of your data with Hive, then you should be able to read it without 
> decoding errors and with partition pruning turned on. Another possibility is 
> running your Spark app with a very large maximum heap configuration, like 8g 
> or even 16g. However, loading all of that partition metadata can be quite 
> slow for very large tables. I'm sorry I can't think of a better solution for 
> you.
> 
> Michael
> 
> 
> 
> 
>> On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <r...@apache.org 
>> <mailto:r...@apache.org>> wrote:
>> 
>> Tested on both 1.5.2 and 1.61.
>> 
>> On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> What version of Spark are you running?
>> 
>>> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <r...@apache.org 
>>> <mailto:r...@apache.org>> wrote:
>>> 
>>>  describe dummy;
>>> 
>>> OK
>>> 
>>> sample              string                                     
>>> 
>>> year                string                                     
>>> 
>>> month               string                                      
>>> 
>>> # Partition Information      
>>> 
>>> # col_name            data_type           comment    
>>> 
>>> year                string                                     
>>> 
>>> 
>>> month               string 
>>> 
>>> 
>>> 
>>> val df = sqlContext.sql("select count(1) from rajub.dummy where 
>>> year='2017'")
>>> 
>>> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
>>> 
>>> scala> df.explain
>>> 
>>> == Physical Plan ==
>>> 
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])
>>> 
>>> +- TungstenExchange SinglePartition, None
>>> 
>>>    +- TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])
>>> 
>>>       +- Scan ParquetRelation: rajub.dummy[] InputPaths: 
>>> maprfs:/user/rajub/dummy/sample/year=2016/month=10, 
>>> maprfs:/user/rajub/dummy/sample/year=2016/month=11, 
>>> maprfs:/user/rajub/dummy/sample/year=2016/month=9, 
>>> maprfs:/user/rajub/dummy/sample/year=2017/month=10, 
>>> maprfs:/user/rajub/dummy/sample/year=2017/month=11, 
>>> maprfs:/user/rajub/dummy/sample/year=2017/month=9
>>> 
>>> 
>>> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com 
>>> <mailto:mich...@videoamp.com>> wrote:
>>> Can you paste the actual query plan here, please?
>>> 
>>>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org 
>>>> <mailto:r...@apache.org>> wrote:
>>>> 
>>>> 
>>>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com 
>>>> <mailto:mich...@videoamp.com>> wrote:
>>>> What is the physical query plan after you set 
>>>> spark.sql.hive.convertMetastoreParquet to true?
>>>> Physical plan continas all the partition locations 
>>>> 
>>>> Michael
>>>> 
>>>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org 
>>>>> <mailto:r...@apache.org>> wrote:
>>>>> 
>>>>> Thanks Michael for the respopnse.
>>>>> 
>>>>> 
>>>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com 
>>>>> <mailto:mich...@videoamp.com>> wrote:
>>>>> Hi Raju,
>>>>> 
>>>>> I'm sorry this isn't working for you. I helped author this functionality 
>>>>> and will try my best to help.
>>>>> 
>>>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
>>>>> false? 
>>>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did 
>>>>> not work for me without  setting spark.sql.hive.convertMetastoreParquet 
>>>>> property. 
>>>>> 
>>>>> Can you link specifically to the jira issue or spark pr you referred to? 
>>>>> The first thing I would try is setting 
>>>>> spark.sql.hive.convertMetastoreParquet to true. Setting that to false 
>>>>> might also explain why you're getting parquet decode errors. If you're 
>>>>> writing your table data with Spark's parquet file writer and reading with 
>>>>> Hive's parquet file reader, there may be an incompatibility accounting 
>>>>> for the decode errors you're seeing. 
>>>>> 
>>>>>  https://issues.apache.org/jira/browse/SPARK-6910 
>>>>> <https://issues.apache.org/jira/browse/SPARK-6910> . My main motivation 
>>>>> is to avoid fetching all the partitions. We reverted 
>>>>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding 
>>>>> errors. After reverting this it is fetching all partiitons from the table.
>>>>> 
>>>>> Can you reply with your table's Hive metastore schema, including 
>>>>> partition schema?
>>>>>      col1 string
>>>>>      col2 string
>>>>>      year int
>>>>>      month int
>>>>>      day int
>>>>>      hour int   
>>>>> # Partition Information    
>>>>> 
>>>>> # col_name            data_type           comment    
>>>>> 
>>>>> year  int
>>>>> 
>>>>> month int
>>>>> 
>>>>> day int
>>>>> 
>>>>> hour int
>>>>> 
>>>>> venture string
>>>>> 
>>>>>  
>>>>> Where are the table's files located?
>>>>> In hadoop. Under some user directory. 
>>>>> If you do a "show partitions <dbname>.<tablename>" in the spark-sql 
>>>>> shell, does it show the partitions you expect to see? If not, run "msck 
>>>>> repair table <dbname>.<tablename>".
>>>>> Yes. It is listing the partitions
>>>>> Cheers,
>>>>> 
>>>>> Michael
>>>>> 
>>>>> 
>>>>>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <r...@apache.org 
>>>>>> <mailto:r...@apache.org>> wrote:
>>>>>> 
>>>>>> Had a high level look into the code. Seems getHiveQlPartitions  method 
>>>>>> from HiveMetastoreCatalog is getting called irrespective of 
>>>>>> metastorePartitionPruning conf value.
>>>>>> 
>>>>>>  It should not fetch all partitions if we set metastorePartitionPruning 
>>>>>> to true (Default value for this is false) 
>>>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): 
>>>>>> Seq[Partition] = {
>>>>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>>>>>     table.getPartitions(predicates)
>>>>>>   } else {
>>>>>>     allPartitions
>>>>>>   }
>>>>>> ...
>>>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>>>>>   client.getPartitionsByFilter(this, predicates)
>>>>>> lazy val allPartitions = table.getAllPartitions
>>>>>> But somehow getAllPartitions is getting called eventough after setting 
>>>>>> metastorePartitionPruning to true.
>>>>>> Am I missing something or looking at wrong place?
>>>>>> 
>>>>>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <r...@apache.org 
>>>>>> <mailto:r...@apache.org>> wrote:
>>>>>> Hello,
>>>>>>       
>>>>>>    Spark sql is generating query plan with all partitions information 
>>>>>> even though if we apply filters on partitions in the query.  Due to 
>>>>>> this, sparkdriver/hive metastore is hitting with OOM as each table is 
>>>>>> with lots of partitions.
>>>>>> 
>>>>>> We can confirm from hive audit logs that it tries to fetch all 
>>>>>> partitions from hive metastore.
>>>>>> 
>>>>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
>>>>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   
>>>>>> cmd=get_partitions : db=xxxx tbl=xxxxx
>>>>>> 
>>>>>> 
>>>>>> Configured the following parameters in the spark conf to fix the above 
>>>>>> issue(source: from spark-jira & github pullreq):
>>>>>>     spark.sql.hive.convertMetastoreParquet   false
>>>>>>     spark.sql.hive.metastorePartitionPruning   true
>>>>>> 
>>>>>>    plan:  rdf.explain
>>>>>>    == Physical Plan ==
>>>>>>        HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
>>>>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 
>>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>>>> 
>>>>>>     get_partitions_by_filter method is called and fetching only required 
>>>>>> partitions.
>>>>>> 
>>>>>>     But we are seeing parquetDecode errors in our applications 
>>>>>> frequently after this. Looks like these decoding errors were because of 
>>>>>> changing serde fromspark-builtin to hive serde.
>>>>>> 
>>>>>> I feel like, fixing query plan generation in the spark-sql is the right 
>>>>>> approach instead of forcing users to use hive serde.
>>>>>> 
>>>>>> Is there any workaround/way to fix this issue? I would like to hear more 
>>>>>> thoughts on this :)
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <r...@apache.org 
>>>>>> <mailto:r...@apache.org>> wrote:
>>>>>> Had a high level look into the code. Seems getHiveQlPartitions  method 
>>>>>> from HiveMetastoreCatalog is getting called irrespective of 
>>>>>> metastorePartitionPruning conf value.
>>>>>> 
>>>>>>  It should not fetch all partitions if we set metastorePartitionPruning 
>>>>>> to true (Default value for this is false) 
>>>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): 
>>>>>> Seq[Partition] = {
>>>>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>>>>>     table.getPartitions(predicates)
>>>>>>   } else {
>>>>>>     allPartitions
>>>>>>   }
>>>>>> ...
>>>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>>>>>   client.getPartitionsByFilter(this, predicates)
>>>>>> lazy val allPartitions = table.getAllPartitions
>>>>>> But somehow getAllPartitions is getting called eventough after setting 
>>>>>> metastorePartitionPruning to true.
>>>>>> Am I missing something or looking at wrong place?
>>>>>> 
>>>>>> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <r...@apache.org 
>>>>>> <mailto:r...@apache.org>> wrote:
>>>>>> Waiting for suggestions/help on this... 
>>>>>> 
>>>>>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org 
>>>>>> <mailto:r...@apache.org>> wrote:
>>>>>> Hello,
>>>>>>       
>>>>>>    Spark sql is generating query plan with all partitions information 
>>>>>> even though if we apply filters on partitions in the query.  Due to 
>>>>>> this, spark driver/hive metastore is hitting with OOM as each table is 
>>>>>> with lots of partitions.
>>>>>> 
>>>>>> We can confirm from hive audit logs that it tries to fetch all 
>>>>>> partitions from hive metastore.
>>>>>> 
>>>>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
>>>>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   
>>>>>> cmd=get_partitions : db=xxxx tbl=xxxxx
>>>>>> 
>>>>>> 
>>>>>> Configured the following parameters in the spark conf to fix the above 
>>>>>> issue(source: from spark-jira & github pullreq):
>>>>>>     spark.sql.hive.convertMetastoreParquet   false
>>>>>>     spark.sql.hive.metastorePartitionPruning   true
>>>>>> 
>>>>>>    plan:  rdf.explain
>>>>>>    == Physical Plan ==
>>>>>>        HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
>>>>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 
>>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>>>> 
>>>>>>     get_partitions_by_filter method is called and fetching only required 
>>>>>> partitions.
>>>>>> 
>>>>>>     But we are seeing parquetDecode errors in our applications 
>>>>>> frequently after this. Looks like these decoding errors were because of 
>>>>>> changing serde from spark-builtin to hive serde.
>>>>>> 
>>>>>> I feel like, fixing query plan generation in the spark-sql is the right 
>>>>>> approach instead of forcing users to use hive serde.
>>>>>> 
>>>>>> Is there any workaround/way to fix this issue? I would like to hear more 
>>>>>> thoughts on this :)
>>>>>> 
>>>>>> ------
>>>>>> Thanks,
>>>>>> Raju Bairishetti,
>>>>>> www.lazada.com <http://www.lazada.com/>
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> 
>>>>>> ------
>>>>>> Thanks,
>>>>>> Raju Bairishetti,
>>>>>> www.lazada.com <http://www.lazada.com/>
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> 
>>>>>> ------
>>>>>> Thanks,
>>>>>> Raju Bairishetti,
>>>>>> www.lazada.com <http://www.lazada.com/>
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> 
>>>>>> ------
>>>>>> Thanks,
>>>>>> Raju Bairishetti,
>>>>>> www.lazada.com <http://www.lazada.com/>
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> 
>>>>>> ------
>>>>>> Thanks,
>>>>>> Raju Bairishetti,
>>>>>> www.lazada.com <http://www.lazada.com/>
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> 
>>>>> ------
>>>>> Thanks,
>>>>> Raju Bairishetti,
>>>>> www.lazada.com <http://www.lazada.com/>
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> 
>>>> ------
>>>> Thanks,
>>>> Raju Bairishetti,
>>>> www.lazada.com <http://www.lazada.com/>
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> ------
>>> Thanks,
>>> Raju Bairishetti,
>>> www.lazada.com <http://www.lazada.com/>
>> 
>> 
>> 
>> -- 
>> 
>> ------
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com <http://www.lazada.com/>
> 
> 
> 
> -- 
> 
> ------
> Thanks,
> Raju Bairishetti,
> www.lazada.com <http://www.lazada.com/>

Reply via email to