These numbers were run on git commit 756c96 (a few days after the 1.0.0-rc3 tag). Do you have a link to the patch that avoids scanning all columns for count(*) or count(1)? I'd like to give it a shot.
Andrew On Mon, May 12, 2014 at 11:41 PM, Reynold Xin <r...@databricks.com> wrote: > Thanks for the experiments and analysis! > > I think Michael already submitted a patch that avoids scanning all columns > for count(*) or count(1). > > > On Mon, May 12, 2014 at 9:46 PM, Andrew Ash <and...@andrewash.com> wrote: > > > Hi Spark devs, > > > > First of all, huge congrats on the parquet integration with SparkSQL! > This > > is an incredible direction forward and something I can see being very > > broadly useful. > > > > I was doing some preliminary tests to see how it works with one of my > > workflows, and wanted to share some numbers that people might want to > know > > about. > > > > I also wanted to point out that .count() doesn't seem integrated with the > > rest of the optimization framework, and some big gains could be possible. > > > > > > So, the numbers: > > > > I took a table extracted from a SQL database and stored in HDFS: > > > > - 115 columns (several always-empty, mostly strings, some enums, some > > numbers) > > - 253,887,080 rows > > - 182,150,295,881 bytes (raw uncompressed) > > - 42,826,820,222 bytes (lzo compressed with .index file) > > > > And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet() > > call: > > > > - Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42 > > cores across 4 machines > > - 17,517,922,117 bytes (parquet per SparkSQL defaults) > > > > So storing in parquet format vs lzo compresses the data down to less than > > 50% of the .lzo size, and under 10% of the raw uncompressed size. Nice! > > > > > > I then did some basic interactions on it: > > > > *Row count* > > > > - LZO > > - lzoFile("/path/to/lzo").count > > - 31.632305953s > > - Parquet > > - sqlContext.parquetFile("/path/to/parquet").count > > - 289.129487003s > > > > Reassembling rows from the separate column storage is clearly really > > expensive. Median task length is 33s vs 4s, and of that 33s in each task > > (319 tasks total) about 1.75 seconds are spent in GC (inefficient object > > allocation?) > > > > > > > > *Count number of rows with a particular key:* > > > > - LZO > > - lzoFile("/path/to/lzo").filter(_.split("\\|")(0) == > > "1234567890").count > > - 73.988897511s > > - Parquet > > - sqlContext.parquetFile("/path/to/parquet").where('COL === > > 1234567890).count > > - 293.410470418s > > - Parquet (hand-tuned to count on just one column) > > - sqlContext.parquetFile("/path/to/parquet").where('COL === > > 1234567890).select('IDCOL).count > > - 1.160449187s > > > > It looks like currently the .count() on parquet is handled incredibly > > inefficiently and all the columns are materialized. But if I select just > > that relevant column and then count, then the column-oriented storage of > > Parquet really shines. > > > > There ought to be a potential optimization here such that a .count() on a > > SchemaRDD backed by Parquet doesn't require re-assembling the rows, as > > that's expensive. I don't think .count() is handled specially in > > SchemaRDDs, but it seems ripe for optimization. > > > > > > *Count number of distinct values in a column* > > > > - LZO > > - lzoFile("/path/to/lzo").map(sel(0)).distinct.count > > - 115.582916866s > > - Parquet > > - > sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count > > - 16.839004826 s > > > > It turns out column selectivity is very useful! I'm guessing that if I > > could get byte counts read out of HDFS, that would just about match up > with > > the difference in read times. > > > > > > > > > > Any thoughts on how to embed the knowledge of my hand-tuned additional > > .select('IDCOL) > > into Catalyst? > > > > > > Thanks again for all the hard work and prep for the 1.0 release! > > > > Andrew > > >