Range partition for parquet file?
Hi, I have a spark job output DataFrame which contains a column named Id, which is a GUID string. We will use Id to filter data in another spark application, so it should be a partition key. I found these two methods in Internet: 1. DataFrame.write.save("Id") method will help, but the possible value space for GUID is too big, I prefer to do a range partition to make it 100 partitions evenly. 2. Another way is DataFrame.repartition("Id"), but the result seems to only stay in memory, once it's saved, then loaded from another spark application, we need to repartition it again? After all, what is the relationship between Parquet partitions and DataFrame.repartition? E.g. The parquet data is stored physically under /year=X/month=Y, I get this data into DataFrame, then call DataFrame.repartition("Id"). Run this query: df.filter("year=2016 and month=5 and Id='') Will Parquet folder pruning still work? Or it's already been partitioned into Id, so it needs to scan all year/month combinations? Thanks
Re: Issue of Hive parquet partitioned table schema mismatch
spark.sql.hive.convertMetastoreParquet is true. I can't repro the issue of scanning all partitions now.. : P Anyway, I found another email thread "Re: Spark Sql behaves strangely with tables with a lot of partitions" I observe the same issue as Jerrick, spark driver will call listStatus for the whole table folder even if I load a pure hive table (not spark created), it cost some time for large partitioned table. Even through spark.sql.parquet.cacheMetadata is true, this listStatus will run before every query, it's not cached. 2015-11-05 8:50 GMT+08:00 Cheng Lian <lian.cs@gmail.com>: > Is there any chance that " spark.sql.hive.convertMetastoreParquet" is > turned off? > > Cheng > > On 11/4/15 5:15 PM, Rex Xiong wrote: > > Thanks Cheng Lian. > I found in 1.5, if I use spark to create this table with partition > discovery, the partition pruning can be performed, but for my old table > definition in pure Hive, the execution plan will do a parquet scan across > all partitions, and it runs very slow. > Looks like the execution plan optimization is different. > > 2015-11-03 23:10 GMT+08:00 Cheng Lian <lian.cs@gmail.com>: > >> SPARK-11153 should be irrelevant because you are filtering on a partition >> key while SPARK-11153 is about Parquet filter push-down and doesn't affect >> partition pruning. >> >> Cheng >> >> >> On 11/3/15 7:14 PM, Rex Xiong wrote: >> >> We found the query performance is very poor due to this issue >> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-11153 >> We usually use filter on partition key, the date, it's in string type in >> 1.3.1 and works great. >> But in 1.5, it needs to do parquet scan for all partitions. >> 2015年10月31日 下午7:38,"Rex Xiong" <bycha...@gmail.com> 写道: >> >>> Add back this thread to email list, forgot to reply all. >>> 2015年10月31日 下午7:23,"Michael Armbrust" < <mich...@databricks.com> >>> mich...@databricks.com> 写道: >>> >>>> Not that I know of. >>>> >>>> On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong < <bycha...@gmail.com> >>>> bycha...@gmail.com> wrote: >>>> >>>>> Good to know that, will have a try. >>>>> So there is no easy way to achieve it in pure hive method? >>>>> 2015年10月 31日 下午7:17,"Michael Armbrust" <mich...@databricks.com> 写道: >>>>> >>>>>> Yeah, this was rewritten to be faster in Spark 1.5. We use it with >>>>>> 10,000s of partitions. >>>>>> >>>>>> On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong < <bycha...@gmail.com> >>>>>> bycha...@gmail.com> wrote: >>>>>> >>>>>>> 1.3.1 >>>>>>> It is a lot of improvement in 1.5+? >>>>>>> >>>>>>> 2015-10-30 19:23 GMT+08:00 Michael Armbrust < >>>>>>> <mich...@databricks.com>mich...@databricks.com>: >>>>>>> >>>>>>>> We have tried schema merging feature, but it's too slow, there're >>>>>>>>> hundreds of partitions. >>>>>>>>> >>>>>>>> Which version of Spark? >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >> > >
Re: Issue of Hive parquet partitioned table schema mismatch
We found the query performance is very poor due to this issue https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-11153 We usually use filter on partition key, the date, it's in string type in 1.3.1 and works great. But in 1.5, it needs to do parquet scan for all partitions. 2015年10月31日 下午7:38,"Rex Xiong" <bycha...@gmail.com>写道: > Add back this thread to email list, forgot to reply all. > 2015年10月31日 下午7:23,"Michael Armbrust" <mich...@databricks.com>写道: > >> Not that I know of. >> >> On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong <bycha...@gmail.com> wrote: >> >>> Good to know that, will have a try. >>> So there is no easy way to achieve it in pure hive method? >>> 2015年10月31日 下午7:17,"Michael Armbrust" <mich...@databricks.com>写道: >>> >>>> Yeah, this was rewritten to be faster in Spark 1.5. We use it with >>>> 10,000s of partitions. >>>> >>>> On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong <bycha...@gmail.com> wrote: >>>> >>>>> 1.3.1 >>>>> It is a lot of improvement in 1.5+? >>>>> >>>>> 2015-10-30 19:23 GMT+08:00 Michael Armbrust <mich...@databricks.com>: >>>>> >>>>>> We have tried schema merging feature, but it's too slow, there're >>>>>>> hundreds of partitions. >>>>>>> >>>>>> Which version of Spark? >>>>>> >>>>> >>>>> >>>> >>
Re: Issue of Hive parquet partitioned table schema mismatch
Add back this thread to email list, forgot to reply all. 2015年10月31日 下午7:23,"Michael Armbrust" <mich...@databricks.com>写道: > Not that I know of. > > On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong <bycha...@gmail.com> wrote: > >> Good to know that, will have a try. >> So there is no easy way to achieve it in pure hive method? >> 2015年10月31日 下午7:17,"Michael Armbrust" <mich...@databricks.com>写道: >> >>> Yeah, this was rewritten to be faster in Spark 1.5. We use it with >>> 10,000s of partitions. >>> >>> On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong <bycha...@gmail.com> wrote: >>> >>>> 1.3.1 >>>> It is a lot of improvement in 1.5+? >>>> >>>> 2015-10-30 19:23 GMT+08:00 Michael Armbrust <mich...@databricks.com>: >>>> >>>>> We have tried schema merging feature, but it's too slow, there're >>>>>> hundreds of partitions. >>>>>> >>>>> Which version of Spark? >>>>> >>>> >>>> >>> >
Issue of Hive parquet partitioned table schema mismatch
Hi folks, I have a Hive external table with partitions. Every day, an App will generate a new partition day=-MM-dd stored by parquet and run add-partition Hive command. In some cases, we will add additional column to new partitions and update Hive table schema, then a query across new and old partitions will fail with exception: org.apache.hive.service.cli.HiveSQLException: org.apache.spark.sql.AnalysisException: cannot resolve 'newcolumn' given input columns We have tried schema merging feature, but it's too slow, there're hundreds of partitions. Is it possible to bypass this schema check and return a default value (such as null) for missing columns? Thank you
Issue of jar dependency in yarn-cluster mode
Hi folks, In my spark application, executor task depends on snakeyaml-1.10.jar I build it with Maven and it works fine: spark-submit --master local --jars d:\snakeyaml-1.10.jar ... But when I try to run it in yarn, I have issue, it seems spark executor cannot find the jar file: spark-submit --master yarn-cluster --jars hdfs://../snakeyaml-1.10.jar .. java.lang.NoSuchMethodError: org.yaml.snakeyaml.Yaml.(Lorg/yaml/snakeyaml/constructor/BaseConstructor;)V I check one executor container folder, snakeyaml-1.10.jar has been successfully downloaded, and in spark driver page, in environment tab, spark.yarn.secondary.jars also contains snakeyaml-1.10.jar I have no idea why it doesn't work. Could some one help to take a look? Thanks
Re: Issue of jar dependency in yarn-cluster mode
I resolve this issue finally by adding --conf spark.executor.extraClassPath= snakeyaml-1.10.jar 2015-10-16 22:57 GMT+08:00 Rex Xiong <bycha...@gmail.com>: > Hi folks, > > In my spark application, executor task depends on snakeyaml-1.10.jar > I build it with Maven and it works fine: > spark-submit --master local --jars d:\snakeyaml-1.10.jar ... > > But when I try to run it in yarn, I have issue, it seems spark executor > cannot find the jar file: > spark-submit --master yarn-cluster --jars hdfs://../snakeyaml-1.10.jar > .. > > java.lang.NoSuchMethodError: > org.yaml.snakeyaml.Yaml.(Lorg/yaml/snakeyaml/constructor/BaseConstructor;)V > > I check one executor container folder, snakeyaml-1.10.jar has been > successfully downloaded, and in spark driver page, in environment tab, > spark.yarn.secondary.jars > also contains snakeyaml-1.10.jar > > I have no idea why it doesn't work. Could some one help to take a look? > > Thanks > >
Jar is cached in yarn-cluster mode?
I use "spark-submit -master yarn-cluster hdfs://.../a.jar .." to submit my app to yarn. Then I update this a.jar in HDFS, run the command again, I found a line of log that was been removed still exist in "yarn logs ". Is there a cache mechanism I need to disable? Thanks
Is it possible to disable AM page proxy in Yarn client mode?
In Yarn client mode, Spark driver URL will be redirected to Yarn web proxy server, but I don't want to use this dynamic name, is it possible to still use host:port as standalone mode?
DESCRIBE FORMATTED doesn't work in Hive Thrift Server?
Hi, I try to use for one table created in spark, but it seems the results are all empty, I want to get metadata for table, what's other options? Thanks +---+ |result | +---+ | # col_name| | | | col | | | | # Detailed Table Information | | Database: | | Owner:| | CreateTime: | | LastAccessTime: | | Protect Mode: | | Retention:| | Location: | | Table Type: | | Table Parameters: | | | | | | | | | | | | | | | | | | | | | | | | # Storage Information | | SerDe Library:| | InputFormat: | | OutputFormat: | | Compressed: | | Num Buckets: | | Bucket Columns: | | Sort Columns: | | Storage Desc Params: | | | | | +---+
How to get Master UI with ZooKeeper HA setup?
Hi, We have a 3-node master setup with ZooKeeper HA. Driver can find the master with spark://xxx:xxx,xxx:xxx,xxx:xxx But how can I find out the valid Master UI without looping through all 3 nodes? Thanks
Re: Parquet Hive table become very slow on 1.3?
Yin, Thanks for your reply. We already patched this PR to our 1.3.0 As Xudong mentioned, we have thousand of parquet files, it's very very slow in first read, and another app will add more files and refresh table regularly. Cheng Lian's PR-5334 seems can resolve this issue, it will skip read all footer if we set auto merge to false. But it's not done yet. Thanks 2015-04-22 23:10 GMT+08:00 Yin Huai yh...@databricks.com: Xudong and Rex, Can you try 1.3.1? With PR 5339 http://github.com/apache/spark/pull/5339 , after we get a hive parquet from metastore and convert it to our native parquet code path, we will cache the converted relation. For now, the first access to that hive parquet table reads all of the footers (when you first refer to that table in a query or call sqlContext.table(hiveParquetTable)). All of your later accesses will hit the metadata cache. Thanks, Yin On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong bycha...@gmail.com wrote: We have the similar issue with massive parquet files, Cheng Lian, could you have a look? 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com: Hi Cheng, I tried both these patches, and seems still not resolve my issue. And I found the most time is spend on this line in newParquet.scala: ParquetFileReader.readAllFootersInParallel( sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) Which need read all the files under the Parquet folder, while our Parquet folder has a lot of Parquet files (near 2000), read one file need about 2 seconds, so it become very slow ... And the PR 5231 did not skip this steps so it not resolve my issue. As our Parquet files are generated by a Spark job, so the number of .parquet files is same with the number of tasks, that is why we have so many files. But these files actually have the same schema. Is there any way to merge these files into one, or avoid scan each of them? On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Xudong, We had been digging this issue for a while, and believe PR 5339 http://github.com/apache/spark/pull/5339 and PR 5334 http://github.com/apache/spark/pull/5339 should fix this issue. There two problems: 1. Normally we cache Parquet table metadata for better performance, but when converting Hive metastore Hive tables, the cache is not used. Thus heavy operations like schema discovery is done every time a metastore Parquet table is converted. 2. With Parquet task side metadata reading (which is turned on by default), we can actually skip the row group information in the footer. However, we accidentally called a Parquet function which doesn't skip row group information. For your question about schema merging, Parquet allows different part-files have different but compatible schemas. For example, part-1.parquet has columns a and b, while part-2.parquet may has columns a and c. In some cases, the summary files (_metadata and _common_metadata) contains the merged schema (a, b, and c), but it's not guaranteed. For example, when the user defined metadata stored different part-files contain different values for the same key, Parquet simply gives up writing summary files. That's why all part-files must be touched to get a precise merged schema. However, in scenarios where a centralized arbitrative schema is available (e.g. Hive metastore schema, or the schema provided by user via data source DDL), we don't need to do schema merging on driver side, but defer it to executor side and each task only needs to reconcile those part-files it needs to touch. This is also what the Parquet developers did recently for parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45. Cheng On 3/31/15 11:49 PM, Zheng, Xudong wrote: Thanks Cheng! Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but the PR 5231 seems not. Not sure any other things I did wrong ... BTW, actually, we are very interested in the schema merging feature in Spark 1.3, so both these two solution will disable this feature, right? It seems that Parquet metadata is store in a file named _metadata in the Parquet file folder (each folder is a partition as we use partition table), why we need scan all Parquet part files? Is there any other solutions could keep schema merging feature at the same time? We are really like this feature :) On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Xudong, This is probably because of Parquet schema merging is turned on by default. This is generally useful for Parquet files with different but compatible schemas. But it needs to read metadata from all Parquet part-files. This can be problematic when reading Parquet files with lots of part-files, especially when the user doesn't need schema merging. This issue is tracked by SPARK-6575, and here is a PR for it: https://github.com/apache/spark/pull/5231. This PR adds
Re: Parquet Hive table become very slow on 1.3?
We have the similar issue with massive parquet files, Cheng Lian, could you have a look? 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com: Hi Cheng, I tried both these patches, and seems still not resolve my issue. And I found the most time is spend on this line in newParquet.scala: ParquetFileReader.readAllFootersInParallel( sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) Which need read all the files under the Parquet folder, while our Parquet folder has a lot of Parquet files (near 2000), read one file need about 2 seconds, so it become very slow ... And the PR 5231 did not skip this steps so it not resolve my issue. As our Parquet files are generated by a Spark job, so the number of .parquet files is same with the number of tasks, that is why we have so many files. But these files actually have the same schema. Is there any way to merge these files into one, or avoid scan each of them? On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Xudong, We had been digging this issue for a while, and believe PR 5339 http://github.com/apache/spark/pull/5339 and PR 5334 http://github.com/apache/spark/pull/5339 should fix this issue. There two problems: 1. Normally we cache Parquet table metadata for better performance, but when converting Hive metastore Hive tables, the cache is not used. Thus heavy operations like schema discovery is done every time a metastore Parquet table is converted. 2. With Parquet task side metadata reading (which is turned on by default), we can actually skip the row group information in the footer. However, we accidentally called a Parquet function which doesn't skip row group information. For your question about schema merging, Parquet allows different part-files have different but compatible schemas. For example, part-1.parquet has columns a and b, while part-2.parquet may has columns a and c. In some cases, the summary files (_metadata and _common_metadata) contains the merged schema (a, b, and c), but it's not guaranteed. For example, when the user defined metadata stored different part-files contain different values for the same key, Parquet simply gives up writing summary files. That's why all part-files must be touched to get a precise merged schema. However, in scenarios where a centralized arbitrative schema is available (e.g. Hive metastore schema, or the schema provided by user via data source DDL), we don't need to do schema merging on driver side, but defer it to executor side and each task only needs to reconcile those part-files it needs to touch. This is also what the Parquet developers did recently for parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45. Cheng On 3/31/15 11:49 PM, Zheng, Xudong wrote: Thanks Cheng! Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but the PR 5231 seems not. Not sure any other things I did wrong ... BTW, actually, we are very interested in the schema merging feature in Spark 1.3, so both these two solution will disable this feature, right? It seems that Parquet metadata is store in a file named _metadata in the Parquet file folder (each folder is a partition as we use partition table), why we need scan all Parquet part files? Is there any other solutions could keep schema merging feature at the same time? We are really like this feature :) On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Xudong, This is probably because of Parquet schema merging is turned on by default. This is generally useful for Parquet files with different but compatible schemas. But it needs to read metadata from all Parquet part-files. This can be problematic when reading Parquet files with lots of part-files, especially when the user doesn't need schema merging. This issue is tracked by SPARK-6575, and here is a PR for it: https://github.com/apache/spark/pull/5231. This PR adds a configuration to disable schema merging by default when doing Hive metastore Parquet table conversion. Another workaround is to fallback to the old Parquet code by setting spark.sql.parquet.useDataSourceApi to false. Cheng On 3/31/15 2:47 PM, Zheng, Xudong wrote: Hi all, We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we find that, just a simple COUNT(*) query will much slower (100x) than Spark 1.2. I find the most time spent on driver to get HDFS blocks. I find large amount of get below logs printed: 15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{ fileLength=77153436 underConstruction=false blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
Issue of sqlContext.createExternalTable with parquet partition discovery after changing folder structure
Hi Spark Users, I'm testing 1.3 new feature of parquet partition discovery. I have 2 sub folders, each has 800 rows. /data/table1/key=1 /data/table1/key=2 In spark-shell, run this command: val t = sqlContext.createExternalTable(table1, hdfs:///data/table1, parquet) t.count It shows 1600 successfully. But after that, I add a new folder /data/table1/key=3, then run t.count again, it still gives me 1600, not 2400. I try to restart spark-shell, then run val t = sqlContext.table(table1) t.count It's 2400 now. I'm wondering there should be a partition cache in driver, I try to set spark.sql.parquet.cacheMetadata to false and test it again, unfortunately it doesn't help. How can I disable this partition cache or force refresh the cache? Thanks
Parquet timestamp support for Hive?
Hi, I got this error when creating a hive table from parquet file: DDLTask: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.UnsupportedOperationException: Parquet does not support timestamp. See HIVE-6384 I check HIVE-6384, it's fixed in 0.14. The hive in spark build is a customized version 0.13.1a (GroupId: org.spark-project.hive), is it possible to get the source code for it and apply patch from HIVE-6384? Thanks
Return jobid for a hive query?
Hi there, I have an app talking to Spark Hive Server using Hive ODBC, querying is OK. But in this interface, I can't get much running details when my query goes wrong, only one error message is shown. I want to get jobid for my query, so that I can go to Application Detail UI to see what's going on. Is it a way to achieve my goal? Or I need some customization in error throwing path? Thanks