Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0?

  We are giving very high memory to spark-driver to avoid the OOM(heap
space/ GC overhead limit) errors in spark-app. But when we run two-three
jobs together, these are bringing down the Hive metastore. We had to
forcefully drop older partitions to avoid frequent downs of Hive Metastore.


On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman <mich...@videoamp.com>
wrote:

> I think I understand. Partition pruning for the case where 
> spark.sql.hive.convertMetastoreParquet
> is true was not added to Spark until 2.1.0. I think that in previous
> versions it only worked when spark.sql.hive.convertMetastoreParquet is
> false. Unfortunately, that configuration gives you data decoding errors. If
> it's possible for you to write all of your data with Hive, then you should
> be able to read it without decoding errors and with partition pruning
> turned on. Another possibility is running your Spark app with a very large
> maximum heap configuration, like 8g or even 16g. However, loading all of
> that partition metadata can be quite slow for very large tables. I'm sorry
> I can't think of a better solution for you.
>
> Michael
>
>
>
>
> On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <r...@apache.org> wrote:
>
> Tested on both 1.5.2 and 1.61.
>
> On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <mich...@videoamp.com>
> wrote:
>
>> What version of Spark are you running?
>>
>> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <r...@apache.org> wrote:
>>
>>  describe dummy;
>>
>> OK
>>
>> sample              string
>>
>> year                string
>>
>> month               string
>>
>> # Partition Information
>>
>> # col_name            data_type           comment
>>
>> year                string
>>
>> month               string
>>
>>
>> val df = sqlContext.sql("select count(1) from rajub.dummy where year='
>> *2017*'")
>>
>> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
>>
>>
>> *scala> df.explain*
>>
>> == Physical Plan ==
>>
>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>> output=[_c0#3070L])
>>
>> +- TungstenExchange SinglePartition, None
>>
>>    +- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)],
>> output=[count#3076L])
>>
>>       +- Scan ParquetRelation: rajub.dummy[] InputPaths:
>> maprfs:/user/rajub/dummy/sample/year=2016/month=10,
>> maprfs:/user/rajub/dummy/sample/year=*2016*/month=11,
>> maprfs:/user/rajub/dummy/sample/year=*2016*/month=9,
>> maprfs:/user/rajub/dummy/sample/year=2017/month=10,
>> maprfs:/user/rajub/dummy/sample/year=2017/month=11,
>> maprfs:/user/rajub/dummy/sample/year=2017/month=9
>>
>> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com>
>> wrote:
>>
>>> Can you paste the actual query plan here, please?
>>>
>>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org> wrote:
>>>
>>>
>>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com>
>>> wrote:
>>>
>>>> What is the physical query plan after you set
>>>> spark.sql.hive.convertMetastoreParquet to true?
>>>>
>>> Physical plan continas all the partition locations
>>>
>>>>
>>>> Michael
>>>>
>>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org> wrote:
>>>>
>>>> Thanks Michael for the respopnse.
>>>>
>>>>
>>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com>
>>>> wrote:
>>>>
>>>>> Hi Raju,
>>>>>
>>>>> I'm sorry this isn't working for you. I helped author this
>>>>> functionality and will try my best to help.
>>>>>
>>>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet
>>>>> to false?
>>>>>
>>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It
>>>> did not work for me without  setting
>>>> *spark.sql.hive.convertMetastoreParquet* property.
>>>>
>>>> Can you link specifically to the jira issue or spark pr you referred
>>>>> to? The first thing I would try is setting 
>>>>> spark.sql.hive.convertMetastoreParquet
>>>>> to true. Setting that to false might also explain why you're getting
>>>>> parquet decode errors. If you're writing your table data with Spark's
>>>>> parquet file writer and reading with Hive's parquet file reader, there may
>>>>> be an incompatibility accounting for the decode errors you're seeing.
>>>>>
>>>>>  https://issues.apache.org/jira/browse/SPARK-6910 . My main
>>>> motivation is to avoid fetching all the partitions. We reverted
>>>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding
>>>> errors. After reverting this it is fetching all partiitons from the table.
>>>>
>>>> Can you reply with your table's Hive metastore schema, including
>>>>> partition schema?
>>>>>
>>>>      col1 string
>>>>      col2 string
>>>>      year int
>>>>      month int
>>>>      day int
>>>>      hour int
>>>>
>>>> # Partition Information
>>>>
>>>> # col_name            data_type           comment
>>>>
>>>> year  int
>>>>
>>>> month int
>>>>
>>>> day int
>>>>
>>>> hour int
>>>>
>>>> venture string
>>>>
>>>>>
>>>>>
>>>> Where are the table's files located?
>>>>>
>>>> In hadoop. Under some user directory.
>>>>
>>>>> If you do a "show partitions <dbname>.<tablename>" in the spark-sql
>>>>> shell, does it show the partitions you expect to see? If not, run "msck
>>>>> repair table <dbname>.<tablename>".
>>>>>
>>>> Yes. It is listing the partitions
>>>>
>>>>> Cheers,
>>>>>
>>>>> Michael
>>>>>
>>>>>
>>>>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <r...@apache.org>
>>>>> wrote:
>>>>>
>>>>> Had a high level look into the code. Seems getHiveQlPartitions  method
>>>>> from HiveMetastoreCatalog is getting called irrespective of 
>>>>> metastorePartitionPruning
>>>>> conf value.
>>>>>
>>>>>  It should not fetch all partitions if we set
>>>>> metastorePartitionPruning to true (Default value for this is false)
>>>>>
>>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): 
>>>>> Seq[Partition] = {
>>>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>>>>     table.getPartitions(predicates)
>>>>>   } else {
>>>>>     allPartitions
>>>>>   }
>>>>>
>>>>> ...
>>>>>
>>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>>>>   client.getPartitionsByFilter(this, predicates)
>>>>>
>>>>> lazy val allPartitions = table.getAllPartitions
>>>>>
>>>>> But somehow getAllPartitions is getting called eventough after setting 
>>>>> metastorePartitionPruning to true.
>>>>>
>>>>> Am I missing something or looking at wrong place?
>>>>>
>>>>>
>>>>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <r...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>>    Spark sql is generating query plan with all partitions information
>>>>>> even though if we apply filters on partitions in the query.  Due to
>>>>>> this, sparkdriver/hive metastore is hitting with OOM as each table
>>>>>> is with lots of partitions.
>>>>>>
>>>>>> We can confirm from hive audit logs that it tries to
>>>>>> *fetch all partitions* from hive metastore.
>>>>>>
>>>>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]:
>>>>>> HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) -
>>>>>> ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx
>>>>>>
>>>>>>
>>>>>> Configured the following parameters in the spark conf to fix the
>>>>>> above issue(source: from spark-jira & github pullreq):
>>>>>>
>>>>>> *spark.sql.hive.convertMetastoreParquet   false*
>>>>>> *    spark.sql.hive.metastorePartitionPruning   true*
>>>>>>
>>>>>>
>>>>>> *   plan:  rdf.explain*
>>>>>> *   == Physical Plan ==*
>>>>>>        HiveTableScan [rejection_reason#626], MetastoreRelation
>>>>>> dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>>>>
>>>>>> *    get_partitions_by_filter* method is called and fetching only
>>>>>> required partitions.
>>>>>>
>>>>>>     But we are seeing parquetDecode errors in our applications
>>>>>> frequently after this. Looks like these decoding errors were because of
>>>>>> changing serde fromspark-builtin to hive serde.
>>>>>>
>>>>>> I feel like,* fixing query plan generation in the spark-sql* is the
>>>>>> right approach instead of forcing users to use hive serde.
>>>>>>
>>>>>> Is there any workaround/way to fix this issue? I would like to hear
>>>>>> more thoughts on this :)
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <r...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Had a high level look into the code. Seems getHiveQlPartitions  method
>>>>>>> from HiveMetastoreCatalog is getting called irrespective of 
>>>>>>> metastorePartitionPruning
>>>>>>> conf value.
>>>>>>>
>>>>>>>  It should not fetch all partitions if we set
>>>>>>> metastorePartitionPruning to true (Default value for this is false)
>>>>>>>
>>>>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): 
>>>>>>> Seq[Partition] = {
>>>>>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>>>>>>     table.getPartitions(predicates)
>>>>>>>   } else {
>>>>>>>     allPartitions
>>>>>>>   }
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>>>>>>   client.getPartitionsByFilter(this, predicates)
>>>>>>>
>>>>>>> lazy val allPartitions = table.getAllPartitions
>>>>>>>
>>>>>>> But somehow getAllPartitions is getting called eventough after setting 
>>>>>>> metastorePartitionPruning to true.
>>>>>>>
>>>>>>> Am I missing something or looking at wrong place?
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <r...@apache.org>
>>>>>>>  wrote:
>>>>>>>
>>>>>>>> Waiting for suggestions/help on this...
>>>>>>>>
>>>>>>>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>>    Spark sql is generating query plan with all partitions
>>>>>>>>> information even though if we apply filters on partitions in the 
>>>>>>>>> query.
>>>>>>>>> Due to this, spark driver/hive metastore is hitting with OOM as each 
>>>>>>>>> table
>>>>>>>>> is with lots of partitions.
>>>>>>>>>
>>>>>>>>> We can confirm from hive audit logs that it tries to *fetch all
>>>>>>>>> partitions* from hive metastore.
>>>>>>>>>
>>>>>>>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]:
>>>>>>>>> HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) -
>>>>>>>>> ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Configured the following parameters in the spark conf to fix the
>>>>>>>>> above issue(source: from spark-jira & github pullreq):
>>>>>>>>>
>>>>>>>>> *spark.sql.hive.convertMetastoreParquet   false*
>>>>>>>>> *    spark.sql.hive.metastorePartitionPruning   true*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *   plan:  rdf.explain*
>>>>>>>>> *   == Physical Plan ==*
>>>>>>>>>        HiveTableScan [rejection_reason#626], MetastoreRelation
>>>>>>>>> dbname, tablename, None,   [(year#314 = 2016),(month#315 = 
>>>>>>>>> 12),(day#316 =
>>>>>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>>>>>>>
>>>>>>>>> *    get_partitions_by_filter* method is called and fetching only
>>>>>>>>> required partitions.
>>>>>>>>>
>>>>>>>>>     But we are seeing parquetDecode errors in our applications
>>>>>>>>> frequently after this. Looks like these decoding errors were because 
>>>>>>>>> of
>>>>>>>>> changing serde from spark-builtin to hive serde.
>>>>>>>>>
>>>>>>>>> I feel like,* fixing query plan generation in the spark-sql* is
>>>>>>>>> the right approach instead of forcing users to use hive serde.
>>>>>>>>>
>>>>>>>>> Is there any workaround/way to fix this issue? I would like to
>>>>>>>>> hear more thoughts on this :)
>>>>>>>>>
>>>>>>>>> ------
>>>>>>>>> Thanks,
>>>>>>>>> Raju Bairishetti,
>>>>>>>>> www.lazada.com
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> ------
>>>>>>>> Thanks,
>>>>>>>> Raju Bairishetti,
>>>>>>>> www.lazada.com
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> ------
>>>>>>> Thanks,
>>>>>>> Raju Bairishetti,
>>>>>>> www.lazada.com
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> ------
>>>>>> Thanks,
>>>>>> Raju Bairishetti,
>>>>>> www.lazada.com
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> ------
>>>>> Thanks,
>>>>> Raju Bairishetti,
>>>>> www.lazada.com
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> ------
>>>> Thanks,
>>>> Raju Bairishetti,
>>>> www.lazada.com
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> ------
>>> Thanks,
>>> Raju Bairishetti,
>>> www.lazada.com
>>>
>>>
>>>
>>
>>
>> --
>>
>> ------
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>>
>>
>
>
> --
>
> ------
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>
>
>


-- 

------
Thanks,
Raju Bairishetti,
www.lazada.com

Reply via email to