Thanks for the experiments and analysis!

I think Michael already submitted a patch that avoids scanning all columns
for count(*) or count(1).


On Mon, May 12, 2014 at 9:46 PM, Andrew Ash <and...@andrewash.com> wrote:

> Hi Spark devs,
>
> First of all, huge congrats on the parquet integration with SparkSQL!  This
> is an incredible direction forward and something I can see being very
> broadly useful.
>
> I was doing some preliminary tests to see how it works with one of my
> workflows, and wanted to share some numbers that people might want to know
> about.
>
> I also wanted to point out that .count() doesn't seem integrated with the
> rest of the optimization framework, and some big gains could be possible.
>
>
> So, the numbers:
>
> I took a table extracted from a SQL database and stored in HDFS:
>
>    - 115 columns (several always-empty, mostly strings, some enums, some
>    numbers)
>    - 253,887,080 rows
>    - 182,150,295,881 bytes (raw uncompressed)
>    - 42,826,820,222 bytes (lzo compressed with .index file)
>
> And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet()
> call:
>
>    - Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42
>    cores across 4 machines
>    - 17,517,922,117 bytes (parquet per SparkSQL defaults)
>
> So storing in parquet format vs lzo compresses the data down to less than
> 50% of the .lzo size, and under 10% of the raw uncompressed size.  Nice!
>
>
> I then did some basic interactions on it:
>
> *Row count*
>
>    - LZO
>       - lzoFile("/path/to/lzo").count
>       - 31.632305953s
>    - Parquet
>       - sqlContext.parquetFile("/path/to/parquet").count
>       - 289.129487003s
>
> Reassembling rows from the separate column storage is clearly really
> expensive.  Median task length is 33s vs 4s, and of that 33s in each task
> (319 tasks total) about 1.75 seconds are spent in GC (inefficient object
> allocation?)
>
>
>
> *Count number of rows with a particular key:*
>
>    - LZO
>    - lzoFile("/path/to/lzo").filter(_.split("\\|")(0) ==
> "1234567890").count
>       - 73.988897511s
>        - Parquet
>    - sqlContext.parquetFile("/path/to/parquet").where('COL ===
>       1234567890).count
>       - 293.410470418s
>        - Parquet (hand-tuned to count on just one column)
>    - sqlContext.parquetFile("/path/to/parquet").where('COL ===
>       1234567890).select('IDCOL).count
>       - 1.160449187s
>
> It looks like currently the .count() on parquet is handled incredibly
> inefficiently and all the columns are materialized.  But if I select just
> that relevant column and then count, then the column-oriented storage of
> Parquet really shines.
>
> There ought to be a potential optimization here such that a .count() on a
> SchemaRDD backed by Parquet doesn't require re-assembling the rows, as
> that's expensive.  I don't think .count() is handled specially in
> SchemaRDDs, but it seems ripe for optimization.
>
>
> *Count number of distinct values in a column*
>
>    - LZO
>    - lzoFile("/path/to/lzo").map(sel(0)).distinct.count
>       - 115.582916866s
>        - Parquet
>    - sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count
>       - 16.839004826 s
>
> It turns out column selectivity is very useful!  I'm guessing that if I
> could get byte counts read out of HDFS, that would just about match up with
> the difference in read times.
>
>
>
>
> Any thoughts on how to embed the knowledge of my hand-tuned additional
> .select('IDCOL)
> into Catalyst?
>
>
> Thanks again for all the hard work and prep for the 1.0 release!
>
> Andrew
>

Reply via email to