Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0?
We are giving very high memory to spark-driver to avoid the OOM(heap space/ GC overhead limit) errors in spark-app. But when we run two-three jobs together, these are bringing down the Hive metastore. We had to forcefully drop older partitions to avoid frequent downs of Hive Metastore. On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman <mich...@videoamp.com> wrote: > I think I understand. Partition pruning for the case where > spark.sql.hive.convertMetastoreParquet > is true was not added to Spark until 2.1.0. I think that in previous > versions it only worked when spark.sql.hive.convertMetastoreParquet is > false. Unfortunately, that configuration gives you data decoding errors. If > it's possible for you to write all of your data with Hive, then you should > be able to read it without decoding errors and with partition pruning > turned on. Another possibility is running your Spark app with a very large > maximum heap configuration, like 8g or even 16g. However, loading all of > that partition metadata can be quite slow for very large tables. I'm sorry > I can't think of a better solution for you. > > Michael > > > > > On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <r...@apache.org> wrote: > > Tested on both 1.5.2 and 1.61. > > On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <mich...@videoamp.com> > wrote: > >> What version of Spark are you running? >> >> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <r...@apache.org> wrote: >> >> describe dummy; >> >> OK >> >> sample string >> >> year string >> >> month string >> >> # Partition Information >> >> # col_name data_type comment >> >> year string >> >> month string >> >> >> val df = sqlContext.sql("select count(1) from rajub.dummy where year=' >> *2017*'") >> >> df: org.apache.spark.sql.DataFrame = [_c0: bigint] >> >> >> *scala> df.explain* >> >> == Physical Plan == >> >> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], >> output=[_c0#3070L]) >> >> +- TungstenExchange SinglePartition, None >> >> +- TungstenAggregate(key=[], >> functions=[(count(1),mode=Partial,isDistinct=false)], >> output=[count#3076L]) >> >> +- Scan ParquetRelation: rajub.dummy[] InputPaths: >> maprfs:/user/rajub/dummy/sample/year=2016/month=10, >> maprfs:/user/rajub/dummy/sample/year=*2016*/month=11, >> maprfs:/user/rajub/dummy/sample/year=*2016*/month=9, >> maprfs:/user/rajub/dummy/sample/year=2017/month=10, >> maprfs:/user/rajub/dummy/sample/year=2017/month=11, >> maprfs:/user/rajub/dummy/sample/year=2017/month=9 >> >> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com> >> wrote: >> >>> Can you paste the actual query plan here, please? >>> >>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org> wrote: >>> >>> >>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com> >>> wrote: >>> >>>> What is the physical query plan after you set >>>> spark.sql.hive.convertMetastoreParquet to true? >>>> >>> Physical plan continas all the partition locations >>> >>>> >>>> Michael >>>> >>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org> wrote: >>>> >>>> Thanks Michael for the respopnse. >>>> >>>> >>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com> >>>> wrote: >>>> >>>>> Hi Raju, >>>>> >>>>> I'm sorry this isn't working for you. I helped author this >>>>> functionality and will try my best to help. >>>>> >>>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet >>>>> to false? >>>>> >>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It >>>> did not work for me without setting >>>> *spark.sql.hive.convertMetastoreParquet* property. >>>> >>>> Can you link specifically to the jira issue or spark pr you referred >>>>> to? The first thing I would try is setting >>>>> spark.sql.hive.convertMetastoreParquet >>>>> to true. Setting that to false might also explain why you're getting >>>>> parquet decode errors. If you're writing your table data with Spark's >>>>> parquet file writer and reading with Hive's parquet file reader, there may >>>>> be an incompatibility accounting for the decode errors you're seeing. >>>>> >>>>> https://issues.apache.org/jira/browse/SPARK-6910 . My main >>>> motivation is to avoid fetching all the partitions. We reverted >>>> spark.sql.hive.convertMetastoreParquet setting to true to decoding >>>> errors. After reverting this it is fetching all partiitons from the table. >>>> >>>> Can you reply with your table's Hive metastore schema, including >>>>> partition schema? >>>>> >>>> col1 string >>>> col2 string >>>> year int >>>> month int >>>> day int >>>> hour int >>>> >>>> # Partition Information >>>> >>>> # col_name data_type comment >>>> >>>> year int >>>> >>>> month int >>>> >>>> day int >>>> >>>> hour int >>>> >>>> venture string >>>> >>>>> >>>>> >>>> Where are the table's files located? >>>>> >>>> In hadoop. Under some user directory. >>>> >>>>> If you do a "show partitions <dbname>.<tablename>" in the spark-sql >>>>> shell, does it show the partitions you expect to see? If not, run "msck >>>>> repair table <dbname>.<tablename>". >>>>> >>>> Yes. It is listing the partitions >>>> >>>>> Cheers, >>>>> >>>>> Michael >>>>> >>>>> >>>>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <r...@apache.org> >>>>> wrote: >>>>> >>>>> Had a high level look into the code. Seems getHiveQlPartitions method >>>>> from HiveMetastoreCatalog is getting called irrespective of >>>>> metastorePartitionPruning >>>>> conf value. >>>>> >>>>> It should not fetch all partitions if we set >>>>> metastorePartitionPruning to true (Default value for this is false) >>>>> >>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): >>>>> Seq[Partition] = { >>>>> val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { >>>>> table.getPartitions(predicates) >>>>> } else { >>>>> allPartitions >>>>> } >>>>> >>>>> ... >>>>> >>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = >>>>> client.getPartitionsByFilter(this, predicates) >>>>> >>>>> lazy val allPartitions = table.getAllPartitions >>>>> >>>>> But somehow getAllPartitions is getting called eventough after setting >>>>> metastorePartitionPruning to true. >>>>> >>>>> Am I missing something or looking at wrong place? >>>>> >>>>> >>>>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <r...@apache.org> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> Spark sql is generating query plan with all partitions information >>>>>> even though if we apply filters on partitions in the query. Due to >>>>>> this, sparkdriver/hive metastore is hitting with OOM as each table >>>>>> is with lots of partitions. >>>>>> >>>>>> We can confirm from hive audit logs that it tries to >>>>>> *fetch all partitions* from hive metastore. >>>>>> >>>>>> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: >>>>>> HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - >>>>>> ugi=rajub ip=/x.x.x.x cmd=get_partitions : db=xxxx tbl=xxxxx >>>>>> >>>>>> >>>>>> Configured the following parameters in the spark conf to fix the >>>>>> above issue(source: from spark-jira & github pullreq): >>>>>> >>>>>> *spark.sql.hive.convertMetastoreParquet false* >>>>>> * spark.sql.hive.metastorePartitionPruning true* >>>>>> >>>>>> >>>>>> * plan: rdf.explain* >>>>>> * == Physical Plan ==* >>>>>> HiveTableScan [rejection_reason#626], MetastoreRelation >>>>>> dbname, tablename, None, [(year#314 = 2016),(month#315 = 12),(day#316 = >>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >>>>>> >>>>>> * get_partitions_by_filter* method is called and fetching only >>>>>> required partitions. >>>>>> >>>>>> But we are seeing parquetDecode errors in our applications >>>>>> frequently after this. Looks like these decoding errors were because of >>>>>> changing serde fromspark-builtin to hive serde. >>>>>> >>>>>> I feel like,* fixing query plan generation in the spark-sql* is the >>>>>> right approach instead of forcing users to use hive serde. >>>>>> >>>>>> Is there any workaround/way to fix this issue? I would like to hear >>>>>> more thoughts on this :) >>>>>> >>>>>> >>>>>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <r...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Had a high level look into the code. Seems getHiveQlPartitions method >>>>>>> from HiveMetastoreCatalog is getting called irrespective of >>>>>>> metastorePartitionPruning >>>>>>> conf value. >>>>>>> >>>>>>> It should not fetch all partitions if we set >>>>>>> metastorePartitionPruning to true (Default value for this is false) >>>>>>> >>>>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): >>>>>>> Seq[Partition] = { >>>>>>> val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { >>>>>>> table.getPartitions(predicates) >>>>>>> } else { >>>>>>> allPartitions >>>>>>> } >>>>>>> >>>>>>> ... >>>>>>> >>>>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = >>>>>>> client.getPartitionsByFilter(this, predicates) >>>>>>> >>>>>>> lazy val allPartitions = table.getAllPartitions >>>>>>> >>>>>>> But somehow getAllPartitions is getting called eventough after setting >>>>>>> metastorePartitionPruning to true. >>>>>>> >>>>>>> Am I missing something or looking at wrong place? >>>>>>> >>>>>>> >>>>>>> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <r...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Waiting for suggestions/help on this... >>>>>>>> >>>>>>>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> Spark sql is generating query plan with all partitions >>>>>>>>> information even though if we apply filters on partitions in the >>>>>>>>> query. >>>>>>>>> Due to this, spark driver/hive metastore is hitting with OOM as each >>>>>>>>> table >>>>>>>>> is with lots of partitions. >>>>>>>>> >>>>>>>>> We can confirm from hive audit logs that it tries to *fetch all >>>>>>>>> partitions* from hive metastore. >>>>>>>>> >>>>>>>>> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: >>>>>>>>> HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - >>>>>>>>> ugi=rajub ip=/x.x.x.x cmd=get_partitions : db=xxxx tbl=xxxxx >>>>>>>>> >>>>>>>>> >>>>>>>>> Configured the following parameters in the spark conf to fix the >>>>>>>>> above issue(source: from spark-jira & github pullreq): >>>>>>>>> >>>>>>>>> *spark.sql.hive.convertMetastoreParquet false* >>>>>>>>> * spark.sql.hive.metastorePartitionPruning true* >>>>>>>>> >>>>>>>>> >>>>>>>>> * plan: rdf.explain* >>>>>>>>> * == Physical Plan ==* >>>>>>>>> HiveTableScan [rejection_reason#626], MetastoreRelation >>>>>>>>> dbname, tablename, None, [(year#314 = 2016),(month#315 = >>>>>>>>> 12),(day#316 = >>>>>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >>>>>>>>> >>>>>>>>> * get_partitions_by_filter* method is called and fetching only >>>>>>>>> required partitions. >>>>>>>>> >>>>>>>>> But we are seeing parquetDecode errors in our applications >>>>>>>>> frequently after this. Looks like these decoding errors were because >>>>>>>>> of >>>>>>>>> changing serde from spark-builtin to hive serde. >>>>>>>>> >>>>>>>>> I feel like,* fixing query plan generation in the spark-sql* is >>>>>>>>> the right approach instead of forcing users to use hive serde. >>>>>>>>> >>>>>>>>> Is there any workaround/way to fix this issue? I would like to >>>>>>>>> hear more thoughts on this :) >>>>>>>>> >>>>>>>>> ------ >>>>>>>>> Thanks, >>>>>>>>> Raju Bairishetti, >>>>>>>>> www.lazada.com >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> ------ >>>>>>>> Thanks, >>>>>>>> Raju Bairishetti, >>>>>>>> www.lazada.com >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> ------ >>>>>>> Thanks, >>>>>>> Raju Bairishetti, >>>>>>> www.lazada.com >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> ------ >>>>>> Thanks, >>>>>> Raju Bairishetti, >>>>>> www.lazada.com >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> ------ >>>>> Thanks, >>>>> Raju Bairishetti, >>>>> www.lazada.com >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> ------ >>>> Thanks, >>>> Raju Bairishetti, >>>> www.lazada.com >>>> >>>> >>>> >>> >>> >>> -- >>> >>> ------ >>> Thanks, >>> Raju Bairishetti, >>> www.lazada.com >>> >>> >>> >> >> >> -- >> >> ------ >> Thanks, >> Raju Bairishetti, >> www.lazada.com >> >> >> > > > -- > > ------ > Thanks, > Raju Bairishetti, > www.lazada.com > > > -- ------ Thanks, Raju Bairishetti, www.lazada.com