[ 
https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16303022#comment-16303022
 ] 

Nan Zhu commented on SPARK-22599:
---------------------------------

[~rajesh.balamohan] no, it means that SPARK-22599 and master are running with 
in-memory data

parquet means that it reads parquet files from the storage system directly

> Avoid extra reading for cached table
> ------------------------------------
>
>                 Key: SPARK-22599
>                 URL: https://issues.apache.org/jira/browse/SPARK-22599
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Nan Zhu
>
> In the current implementation of Spark, InMemoryTableExec read all data in a 
> cached table, filter CachedBatch according to stats and pass data to the 
> downstream operators. This implementation makes it inefficient to reside the 
> whole table in memory to serve various queries against different partitions 
> of the table, which occupies a certain portion of our users' scenarios.
> The following is an example of such a use case:
> store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
> 'location'. The first query, Q1, wants to output several metrics A, B, C for 
> all stores in all locations. After that, a small team of 3 data scientists 
> wants to do some causal analysis for the sales in different locations. To 
> avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache 
> the whole table in memory in Q1.
> With the current implementation, even any one of the data scientists is only 
> interested in one out of three locations, the queries they submit to Spark 
> cluster is still reading 1TB data completely.
> The reason behind the extra reading operation is that we implement 
> CachedBatch as
> {code}
> case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
> InternalRow)
> {code}
> where "stats" is a part of every CachedBatch, so we can only filter batches 
> for output of InMemoryTableExec operator by reading all data in in-memory 
> table as input. The extra reading would be even more unacceptable when some 
> of the table's data is evicted to disks.
> We propose to introduce a new type of block, metadata block, for the 
> partitions of RDD representing data in the cached table. Every metadata block 
> contains stats info for all columns in a partition and is saved to 
> BlockManager when executing compute() method for the partition. To minimize 
> the number of bytes to read,
> More details can be found in design 
> doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing
> performance test results:
> Environment: 6 Executors, each of which has 16 cores 90G memory
> dataset: 1T TPCDS data
> queries: tested 4 queries (Q19, Q46, Q34, Q27) in 
> https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala
> results: 
> https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to