Is it possible to set the number of cores per executor on standalone cluster?

2015-07-17 Thread Zheng, Xudong
Is it possible to set the number of cores per executor on standalone
cluster?

Because we find that, cores distribution may be very skewed on executor at
some time, so the workload is skewed, that make our job become slow.

Thanks!

-- 
郑旭东
Zheng, Xudong


Re: Parquet Hive table become very slow on 1.3?

2015-04-08 Thread Zheng, Xudong
Hi Cheng,

I tried both these patches, and seems still not resolve my issue. And I
found the most time is spend on this line in newParquet.scala:

ParquetFileReader.readAllFootersInParallel(
  sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData)

Which need read all the files under the Parquet folder, while our Parquet
folder has a lot of Parquet files (near 2000), read one file need about 2
seconds, so it become very slow ... And the PR 5231 did not skip this steps
so it not resolve my issue.

As our Parquet files are generated by a Spark job, so the number of
.parquet files is same with the number of tasks, that is why we have so
many files. But these files actually have the same schema. Is there any way
to merge these files into one, or avoid scan each of them?

On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Xudong,

 We had been digging this issue for a while, and believe PR 5339
 http://github.com/apache/spark/pull/5339 and PR 5334
 http://github.com/apache/spark/pull/5339 should fix this issue.

 There two problems:

 1. Normally we cache Parquet table metadata for better performance, but
 when converting Hive metastore Hive tables, the cache is not used. Thus
 heavy operations like schema discovery is done every time a metastore
 Parquet table is converted.
 2. With Parquet task side metadata reading (which is turned on by
 default), we can actually skip the row group information in the footer.
 However, we accidentally called a Parquet function which doesn't skip row
 group information.

 For your question about schema merging, Parquet allows different
 part-files have different but compatible schemas. For example,
 part-1.parquet has columns a and b, while part-2.parquet may has
 columns a and c. In some cases, the summary files (_metadata and
 _common_metadata) contains the merged schema (a, b, and c), but it's not
 guaranteed. For example, when the user defined metadata stored different
 part-files contain different values for the same key, Parquet simply gives
 up writing summary files. That's why all part-files must be touched to get
 a precise merged schema.

 However, in scenarios where a centralized arbitrative schema is available
 (e.g. Hive metastore schema, or the schema provided by user via data source
 DDL), we don't need to do schema merging on driver side, but defer it to
 executor side and each task only needs to reconcile those part-files it
 needs to touch. This is also what the Parquet developers did recently for
 parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45.

 Cheng


 On 3/31/15 11:49 PM, Zheng, Xudong wrote:

 Thanks Cheng!

  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
 but the PR 5231 seems not. Not sure any other things I did wrong ...

  BTW, actually, we are very interested in the schema merging feature in
 Spark 1.3, so both these two solution will disable this feature, right? It
 seems that Parquet metadata is store in a file named _metadata in the
 Parquet file folder (each folder is a partition as we use partition table),
 why we need scan all Parquet part files? Is there any other solutions could
 keep schema merging feature at the same time? We are really like this
 feature :)

 On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hi Xudong,

 This is probably because of Parquet schema merging is turned on by
 default. This is generally useful for Parquet files with different but
 compatible schemas. But it needs to read metadata from all Parquet
 part-files. This can be problematic when reading Parquet files with lots of
 part-files, especially when the user doesn't need schema merging.

 This issue is tracked by SPARK-6575, and here is a PR for it:
 https://github.com/apache/spark/pull/5231. This PR adds a configuration
 to disable schema merging by default when doing Hive metastore Parquet
 table conversion.

 Another workaround is to fallback to the old Parquet code by setting
 spark.sql.parquet.useDataSourceApi to false.

 Cheng


 On 3/31/15 2:47 PM, Zheng, Xudong wrote:

 Hi all,

  We are using Parquet Hive table, and we are upgrading to Spark 1.3. But
 we find that, just a simple COUNT(*) query will much slower (100x) than
 Spark 1.2.

  I find the most time spent on driver to get HDFS blocks. I find large
 amount of get below logs printed:

  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 
 2097ms
 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
   fileLength=77153436
   underConstruction=false
   
 blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
  getBlockSize()=77153436; corrupt=false; offset=0; 
 locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
   
 lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
  getBlockSize()=77153436; corrupt=false; offset=0; 
 locs=[10.152.116.169

Parquet Hive table become very slow on 1.3?

2015-03-31 Thread Zheng, Xudong
Hi all,

We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we
find that, just a simple COUNT(*) query will much slower (100x) than Spark
1.2.

I find the most time spent on driver to get HDFS blocks. I find large
amount of get below logs printed:

15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
  fileLength=77153436
  underConstruction=false
  
blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
getBlockSize()=77153436; corrupt=false; offset=0;
locs=[10.152.116.172:50010, 10.152.116.169:50010,
10.153.125.184:50010]}]
  
lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
getBlockSize()=77153436; corrupt=false; offset=0;
locs=[10.152.116.169:50010, 10.153.125.184:50010,
10.152.116.172:50010]}
  isLastBlockComplete=true}
15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010


I compare the printed log with Spark 1.2, although the number of
getBlockLocations call is similar, but each such operation only cost 20~30
ms (but it is 2000ms~3000ms now), and it didn't print the detailed
LocatedBlocks info.

Another finding is, if I read the Parquet file via scala code form
spark-shell as below, it looks fine, the computation will return the result
quick as before.

sqlContext.parquetFile(data/myparquettable)


Any idea about it? Thank you!


-- 
郑旭东
Zheng, Xudong