Re: parquet partition discovery
Back to the user list so everyone can see the result of the discussion... Ah. It all makes sense now. The issue is that when I created the parquet files, I included an unnecessary directory name (data.parquet) below the partition directories. It’s just a leftover from when I started with Michael’s sample code and it only made sense before I added the partition directories. I probably thought it was some magic name that was required when spark scanned for parquet files. The structure looks something like this: drwxr-xr-x - user supergroup 0 2015-04-02 13:17 hdfs://host/tablename/date=20150302/sym=A/data.parquet/... If I just move all the files up a level (there goes a day of work) , the existing code should work fine. Whether it’s useful to handle intermediate non-partition directories or whether that just creates some extra risk I can’t say, since I’m new to all the technology in this whole stack. I'm mixed here. There is always a tradeoff between silently ignoring structure that people might not be aware of (and thus might be a bug) and just working. Having this as an option at least certainly seems reasonable. I'd be curious if anyone had other thoughts? Unfortunately, it takes many minutes (even with mergeSchema=false) to create the RDD. It appears that the whole data store will still be recursively traversed (even with mergeSchema=false, a manually specified schema, and a partition spec [which I can’t pass in through a public API]) so that all of the metadata FileStatuses can be cached. In my case I’m going to have years of data, so there’s no way that will be feasible. Should I just explicitly load the partitions I want instead of using partition discovery? Is there any plan to have a less aggressive version of support for partitions, where metadata is only cached for partitions that are used in queries? We improved the speed here in 1.3.1 so I'd be curious if that helps. We definitely need to continue to speed things up here though. We have to enumerate all the partitions so we know what data to read when a query comes in, but I do think we can parallelize it or something.
Re: parquet partition discovery
On 4/9/15 3:09 AM, Michael Armbrust wrote: Back to the user list so everyone can see the result of the discussion... Ah. It all makes sense now. The issue is that when I created the parquet files, I included an unnecessary directory name (data.parquet) below the partition directories. It’s just a leftover from when I started with Michael’s sample code and it only made sense before I added the partition directories. I probably thought it was some magic name that was required when spark scanned for parquet files. The structure looks something like this: drwxr-xr-x - user supergroup 0 2015-04-02 13:17 hdfs://host/tablename/date=20150302/sym=A/data.parquet/... If I just move all the files up a level (there goes a day of work) , the existing code should work fine. Whether it’s useful to handle intermediate non-partition directories or whether that just creates some extra risk I can’t say, since I’m new to all the technology in this whole stack. I'm mixed here. There is always a tradeoff between silently ignoring structure that people might not be aware of (and thus might be a bug) and just working. Having this as an option at least certainly seems reasonable. I'd be curious if anyone had other thoughts? Take the following directory name as an example: /path/to/partition/a=1/random/b=foo One possible approach can be, we grab both a=1 and b=foo, then either report random by throwing an exception or ignore it with a WARN log. Unfortunately, it takes many minutes (even with mergeSchema=false) to create the RDD. It appears that the whole data store will still be recursively traversed (even with mergeSchema=false, a manually specified schema, and a partition spec [which I can’t pass in through a public API]) so that all of the metadata FileStatuses can be cached. In my case I’m going to have years of data, so there’s no way that will be feasible. Should I just explicitly load the partitions I want instead of using partition discovery? Is there any plan to have a less aggressive version of support for partitions, where metadata is only cached for partitions that are used in queries? We improved the speed here in 1.3.1 so I'd be curious if that helps. We definitely need to continue to speed things up here though. We have to enumerate all the partitions so we know what data to read when a query comes in, but I do think we can parallelize it or something.
parquet partition discovery
I was unable to get this feature to work in 1.3.0. I tried building off master and it still wasn't working for me. So I dug into the code, and I'm not sure how the parsePartition() was ever working. The while loop which walks up the parent directories in the path always terminates after a single iteration. I made a minor change and the partition discovery appears to work now. Specifically, I changed var chopped = path while (!finished) { val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) maybeColumn.foreach(columns += _) chopped = chopped.getParent finished = maybeColumn.isEmpty || chopped.getParent == null } To var chopped = path while (chopped != null) { val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) maybeColumn.foreach(columns += _) chopped = chopped.getParent } Because the leaf nodes are always named data.parquet, this loop was terminating immediately after the first iteration. The only other thought I had is that the loop may have been intended to walk up the path until it stopped finding partition directories. In this case, the loop would work fine as is, but chopped should be initialized to path.getParent rather than path. I'm completely new to spark so it's possible that I misunderstood the intent here completely, but if not then I'm happy to open an issue and submit a pull request for whichever approach is the correct one. This e-mail and its attachments are intended only for the individual or entity to whom it is addressed and may contain information that is confidential, privileged, inside information, or subject to other restrictions on use or disclosure. Any unauthorized use, dissemination or copying of this transmission or the information in it is prohibited and may be unlawful. If you have received this transmission in error, please notify the sender immediately by return e-mail, and permanently delete or destroy this e-mail, any attachments, and all copies (digital or paper). Unless expressly stated in this e-mail, nothing in this message should be construed as a digital or electronic signature. For additional important disclaimers and disclosures regarding KCG's products and services, please click on the following link: http://www.kcg.com/legal/global-disclosures
Issue of sqlContext.createExternalTable with parquet partition discovery after changing folder structure
Hi Spark Users, I'm testing 1.3 new feature of parquet partition discovery. I have 2 sub folders, each has 800 rows. /data/table1/key=1 /data/table1/key=2 In spark-shell, run this command: val t = sqlContext.createExternalTable(table1, hdfs:///data/table1, parquet) t.count It shows 1600 successfully. But after that, I add a new folder /data/table1/key=3, then run t.count again, it still gives me 1600, not 2400. I try to restart spark-shell, then run val t = sqlContext.table(table1) t.count It's 2400 now. I'm wondering there should be a partition cache in driver, I try to set spark.sql.parquet.cacheMetadata to false and test it again, unfortunately it doesn't help. How can I disable this partition cache or force refresh the cache? Thanks
Re: Issue of sqlContext.createExternalTable with parquet partition discovery after changing folder structure
You need to refresh the external table manually after updating the data source outside Spark SQL: - via Scala API: sqlContext.refreshTable(table1) - via SQL: REFRESH TABLE table1; Cheng On 4/4/15 5:24 PM, Rex Xiong wrote: Hi Spark Users, I'm testing 1.3 new feature of parquet partition discovery. I have 2 sub folders, each has 800 rows. /data/table1/key=1 /data/table1/key=2 In spark-shell, run this command: val t = sqlContext.createExternalTable(table1, hdfs:///data/table1, parquet) t.count It shows 1600 successfully. But after that, I add a new folder /data/table1/key=3, then run t.count again, it still gives me 1600, not 2400. I try to restart spark-shell, then run val t = sqlContext.table(table1) t.count It's 2400 now. I'm wondering there should be a partition cache in driver, I try to set spark.sql.parquet.cacheMetadata to false and test it again, unfortunately it doesn't help. How can I disable this partition cache or force refresh the cache? Thanks