Based on what you've described, I think you should be able to use Spark's parquet reader plus partition pruning in 2.1.
> On Jan 17, 2017, at 10:44 PM, Raju Bairishetti <r...@apache.org> wrote: > > Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0? > > We are giving very high memory to spark-driver to avoid the OOM(heap space/ > GC overhead limit) errors in spark-app. But when we run two-three jobs > together, these are bringing down the Hive metastore. We had to forcefully > drop older partitions to avoid frequent downs of Hive Metastore. > > > On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman <mich...@videoamp.com > <mailto:mich...@videoamp.com>> wrote: > I think I understand. Partition pruning for the case where > spark.sql.hive.convertMetastoreParquet is true was not added to Spark until > 2.1.0. I think that in previous versions it only worked when > spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that > configuration gives you data decoding errors. If it's possible for you to > write all of your data with Hive, then you should be able to read it without > decoding errors and with partition pruning turned on. Another possibility is > running your Spark app with a very large maximum heap configuration, like 8g > or even 16g. However, loading all of that partition metadata can be quite > slow for very large tables. I'm sorry I can't think of a better solution for > you. > > Michael > > > > >> On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <r...@apache.org >> <mailto:r...@apache.org>> wrote: >> >> Tested on both 1.5.2 and 1.61. >> >> On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <mich...@videoamp.com >> <mailto:mich...@videoamp.com>> wrote: >> What version of Spark are you running? >> >>> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <r...@apache.org >>> <mailto:r...@apache.org>> wrote: >>> >>> describe dummy; >>> >>> OK >>> >>> sample string >>> >>> year string >>> >>> month string >>> >>> # Partition Information >>> >>> # col_name data_type comment >>> >>> year string >>> >>> >>> month string >>> >>> >>> >>> val df = sqlContext.sql("select count(1) from rajub.dummy where >>> year='2017'") >>> >>> df: org.apache.spark.sql.DataFrame = [_c0: bigint] >>> >>> scala> df.explain >>> >>> == Physical Plan == >>> >>> TungstenAggregate(key=[], >>> functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L]) >>> >>> +- TungstenExchange SinglePartition, None >>> >>> +- TungstenAggregate(key=[], >>> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L]) >>> >>> +- Scan ParquetRelation: rajub.dummy[] InputPaths: >>> maprfs:/user/rajub/dummy/sample/year=2016/month=10, >>> maprfs:/user/rajub/dummy/sample/year=2016/month=11, >>> maprfs:/user/rajub/dummy/sample/year=2016/month=9, >>> maprfs:/user/rajub/dummy/sample/year=2017/month=10, >>> maprfs:/user/rajub/dummy/sample/year=2017/month=11, >>> maprfs:/user/rajub/dummy/sample/year=2017/month=9 >>> >>> >>> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com >>> <mailto:mich...@videoamp.com>> wrote: >>> Can you paste the actual query plan here, please? >>> >>>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org >>>> <mailto:r...@apache.org>> wrote: >>>> >>>> >>>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com >>>> <mailto:mich...@videoamp.com>> wrote: >>>> What is the physical query plan after you set >>>> spark.sql.hive.convertMetastoreParquet to true? >>>> Physical plan continas all the partition locations >>>> >>>> Michael >>>> >>>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org >>>>> <mailto:r...@apache.org>> wrote: >>>>> >>>>> Thanks Michael for the respopnse. >>>>> >>>>> >>>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com >>>>> <mailto:mich...@videoamp.com>> wrote: >>>>> Hi Raju, >>>>> >>>>> I'm sorry this isn't working for you. I helped author this functionality >>>>> and will try my best to help. >>>>> >>>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to >>>>> false? >>>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did >>>>> not work for me without setting spark.sql.hive.convertMetastoreParquet >>>>> property. >>>>> >>>>> Can you link specifically to the jira issue or spark pr you referred to? >>>>> The first thing I would try is setting >>>>> spark.sql.hive.convertMetastoreParquet to true. Setting that to false >>>>> might also explain why you're getting parquet decode errors. If you're >>>>> writing your table data with Spark's parquet file writer and reading with >>>>> Hive's parquet file reader, there may be an incompatibility accounting >>>>> for the decode errors you're seeing. >>>>> >>>>> https://issues.apache.org/jira/browse/SPARK-6910 >>>>> <https://issues.apache.org/jira/browse/SPARK-6910> . My main motivation >>>>> is to avoid fetching all the partitions. We reverted >>>>> spark.sql.hive.convertMetastoreParquet setting to true to decoding >>>>> errors. After reverting this it is fetching all partiitons from the table. >>>>> >>>>> Can you reply with your table's Hive metastore schema, including >>>>> partition schema? >>>>> col1 string >>>>> col2 string >>>>> year int >>>>> month int >>>>> day int >>>>> hour int >>>>> # Partition Information >>>>> >>>>> # col_name data_type comment >>>>> >>>>> year int >>>>> >>>>> month int >>>>> >>>>> day int >>>>> >>>>> hour int >>>>> >>>>> venture string >>>>> >>>>> >>>>> Where are the table's files located? >>>>> In hadoop. Under some user directory. >>>>> If you do a "show partitions <dbname>.<tablename>" in the spark-sql >>>>> shell, does it show the partitions you expect to see? If not, run "msck >>>>> repair table <dbname>.<tablename>". >>>>> Yes. It is listing the partitions >>>>> Cheers, >>>>> >>>>> Michael >>>>> >>>>> >>>>>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <r...@apache.org >>>>>> <mailto:r...@apache.org>> wrote: >>>>>> >>>>>> Had a high level look into the code. Seems getHiveQlPartitions method >>>>>> from HiveMetastoreCatalog is getting called irrespective of >>>>>> metastorePartitionPruning conf value. >>>>>> >>>>>> It should not fetch all partitions if we set metastorePartitionPruning >>>>>> to true (Default value for this is false) >>>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): >>>>>> Seq[Partition] = { >>>>>> val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { >>>>>> table.getPartitions(predicates) >>>>>> } else { >>>>>> allPartitions >>>>>> } >>>>>> ... >>>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = >>>>>> client.getPartitionsByFilter(this, predicates) >>>>>> lazy val allPartitions = table.getAllPartitions >>>>>> But somehow getAllPartitions is getting called eventough after setting >>>>>> metastorePartitionPruning to true. >>>>>> Am I missing something or looking at wrong place? >>>>>> >>>>>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <r...@apache.org >>>>>> <mailto:r...@apache.org>> wrote: >>>>>> Hello, >>>>>> >>>>>> Spark sql is generating query plan with all partitions information >>>>>> even though if we apply filters on partitions in the query. Due to >>>>>> this, sparkdriver/hive metastore is hitting with OOM as each table is >>>>>> with lots of partitions. >>>>>> >>>>>> We can confirm from hive audit logs that it tries to fetch all >>>>>> partitions from hive metastore. >>>>>> >>>>>> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: HiveMetaStore.audit >>>>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub ip=/x.x.x.x >>>>>> cmd=get_partitions : db=xxxx tbl=xxxxx >>>>>> >>>>>> >>>>>> Configured the following parameters in the spark conf to fix the above >>>>>> issue(source: from spark-jira & github pullreq): >>>>>> spark.sql.hive.convertMetastoreParquet false >>>>>> spark.sql.hive.metastorePartitionPruning true >>>>>> >>>>>> plan: rdf.explain >>>>>> == Physical Plan == >>>>>> HiveTableScan [rejection_reason#626], MetastoreRelation dbname, >>>>>> tablename, None, [(year#314 = 2016),(month#315 = 12),(day#316 = >>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >>>>>> >>>>>> get_partitions_by_filter method is called and fetching only required >>>>>> partitions. >>>>>> >>>>>> But we are seeing parquetDecode errors in our applications >>>>>> frequently after this. Looks like these decoding errors were because of >>>>>> changing serde fromspark-builtin to hive serde. >>>>>> >>>>>> I feel like, fixing query plan generation in the spark-sql is the right >>>>>> approach instead of forcing users to use hive serde. >>>>>> >>>>>> Is there any workaround/way to fix this issue? I would like to hear more >>>>>> thoughts on this :) >>>>>> >>>>>> >>>>>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <r...@apache.org >>>>>> <mailto:r...@apache.org>> wrote: >>>>>> Had a high level look into the code. Seems getHiveQlPartitions method >>>>>> from HiveMetastoreCatalog is getting called irrespective of >>>>>> metastorePartitionPruning conf value. >>>>>> >>>>>> It should not fetch all partitions if we set metastorePartitionPruning >>>>>> to true (Default value for this is false) >>>>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): >>>>>> Seq[Partition] = { >>>>>> val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { >>>>>> table.getPartitions(predicates) >>>>>> } else { >>>>>> allPartitions >>>>>> } >>>>>> ... >>>>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = >>>>>> client.getPartitionsByFilter(this, predicates) >>>>>> lazy val allPartitions = table.getAllPartitions >>>>>> But somehow getAllPartitions is getting called eventough after setting >>>>>> metastorePartitionPruning to true. >>>>>> Am I missing something or looking at wrong place? >>>>>> >>>>>> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <r...@apache.org >>>>>> <mailto:r...@apache.org>> wrote: >>>>>> Waiting for suggestions/help on this... >>>>>> >>>>>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org >>>>>> <mailto:r...@apache.org>> wrote: >>>>>> Hello, >>>>>> >>>>>> Spark sql is generating query plan with all partitions information >>>>>> even though if we apply filters on partitions in the query. Due to >>>>>> this, spark driver/hive metastore is hitting with OOM as each table is >>>>>> with lots of partitions. >>>>>> >>>>>> We can confirm from hive audit logs that it tries to fetch all >>>>>> partitions from hive metastore. >>>>>> >>>>>> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: HiveMetaStore.audit >>>>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub ip=/x.x.x.x >>>>>> cmd=get_partitions : db=xxxx tbl=xxxxx >>>>>> >>>>>> >>>>>> Configured the following parameters in the spark conf to fix the above >>>>>> issue(source: from spark-jira & github pullreq): >>>>>> spark.sql.hive.convertMetastoreParquet false >>>>>> spark.sql.hive.metastorePartitionPruning true >>>>>> >>>>>> plan: rdf.explain >>>>>> == Physical Plan == >>>>>> HiveTableScan [rejection_reason#626], MetastoreRelation dbname, >>>>>> tablename, None, [(year#314 = 2016),(month#315 = 12),(day#316 = >>>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >>>>>> >>>>>> get_partitions_by_filter method is called and fetching only required >>>>>> partitions. >>>>>> >>>>>> But we are seeing parquetDecode errors in our applications >>>>>> frequently after this. Looks like these decoding errors were because of >>>>>> changing serde from spark-builtin to hive serde. >>>>>> >>>>>> I feel like, fixing query plan generation in the spark-sql is the right >>>>>> approach instead of forcing users to use hive serde. >>>>>> >>>>>> Is there any workaround/way to fix this issue? I would like to hear more >>>>>> thoughts on this :) >>>>>> >>>>>> ------ >>>>>> Thanks, >>>>>> Raju Bairishetti, >>>>>> www.lazada.com <http://www.lazada.com/> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> ------ >>>>>> Thanks, >>>>>> Raju Bairishetti, >>>>>> www.lazada.com <http://www.lazada.com/> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> ------ >>>>>> Thanks, >>>>>> Raju Bairishetti, >>>>>> www.lazada.com <http://www.lazada.com/> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> ------ >>>>>> Thanks, >>>>>> Raju Bairishetti, >>>>>> www.lazada.com <http://www.lazada.com/> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> ------ >>>>>> Thanks, >>>>>> Raju Bairishetti, >>>>>> www.lazada.com <http://www.lazada.com/> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> ------ >>>>> Thanks, >>>>> Raju Bairishetti, >>>>> www.lazada.com <http://www.lazada.com/> >>>> >>>> >>>> >>>> -- >>>> >>>> ------ >>>> Thanks, >>>> Raju Bairishetti, >>>> www.lazada.com <http://www.lazada.com/> >>> >>> >>> >>> -- >>> >>> ------ >>> Thanks, >>> Raju Bairishetti, >>> www.lazada.com <http://www.lazada.com/> >> >> >> >> -- >> >> ------ >> Thanks, >> Raju Bairishetti, >> www.lazada.com <http://www.lazada.com/> > > > > -- > > ------ > Thanks, > Raju Bairishetti, > www.lazada.com <http://www.lazada.com/>