[ 
https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-22599:
----------------------------
    Description: 
In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing



  was:
In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code:scala}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing




> Avoid extra reading for cached table
> ------------------------------------
>
>                 Key: SPARK-22599
>                 URL: https://issues.apache.org/jira/browse/SPARK-22599
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Nan Zhu
>
> In the current implementation of Spark, InMemoryTableExec read all data in a 
> cached table, filter CachedBatch according to stats and pass data to the 
> downstream operators. This implementation makes it inefficient to reside the 
> whole table in memory to serve various queries against different partitions 
> of the table, which occupies a certain portion of our users' scenarios.
> The following is an example of such a use case:
> store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
> 'location'. The first query, Q1, wants to output several metrics A, B, C for 
> all stores in all locations. After that, a small team of 3 data scientists 
> wants to do some causal analysis for the sales in different locations. To 
> avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache 
> the whole table in memory in Q1.
> With the current implementation, even any one of the data scientists is only 
> interested in one out of three locations, the queries they submit to Spark 
> cluster is still reading 1TB data completely.
> The reason behind the extra reading operation is that we implement 
> CachedBatch as
> {code}
> case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
> InternalRow)
> {code}
> where "stats" is a part of every CachedBatch, so we can only filter batches 
> for output of InMemoryTableExec operator by reading all data in in-memory 
> table as input. The extra reading would be even more unacceptable when some 
> of the table's data is evicted to disks.
> We propose to introduce a new type of block, metadata block, for the 
> partitions of RDD representing data in the cached table. Every metadata block 
> contains stats info for all columns in a partition and is saved to 
> BlockManager when executing compute() method for the partition. To minimize 
> the number of bytes to read,
> More details can be found in design 
> doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to