[ 
https://issues.apache.org/jira/browse/SPARK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaoli updated SPARK-35274:
---------------------------
    Description: 
I asked this question 
[before|hhttps://issues.apache.org/jira/browse/SPARK-35190], but perhaps I did 
not addressed question clearly, so I did not get answer. This time I will show 
an example to illustrate this question clearly.
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 
val spark = SparkSession.builder().appName("OrcTest").getOrCreate()
var inputBytes = 0L
spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { 
    val metrics = taskEnd.taskMetrics
    inputBytes += metrics.inputMetrics.bytesRead 
  } 
}) 
spark.sql("create table orc_table_old_schema (_col0 int, _col1 string, _col2 
double) STORED AS ORC;")
spark.sql("insert overwrite table orc_table_old_schema select 1, 'name1', 
1000.05")
inputBytes = 0L
spark.sql("select _col2 from orc_table_old_schema").show()
print("input bytes for old schema table: " + inputBytes) // print 1655 

spark.sql("create table orc_table_new_schema (id int, name string, value 
double) STORED AS ORC;")
spark.sql("insert overwrite table orc_table_new_schema select 1, 'name1', 
1000.05")
inputBytes = 0L
spark.sql("select value from orc_table_new_schema").show()
print("input bytes for new schema table: " + inputBytes) // print 1641
{code}
This example is run on spark3.0 with default flags. In this example, I create 
orc table orc_table_old_schema, which schema has no field name and is written 
before HIVE-4243, to trigger this issue. You can see that input bytes for table 
orc_table_old_schema is 14 bytes more than table orc_table_new_schema.

The reason is that spark3.0 default use native reader rather than hive reader 
to read orc table, and native reader read all columns for old hive schema table 
and read only pruning columns (in this example, only column 'value' is read) 
for new hive schema table.

The following flags enable native reader: set 
spark.sql.hive.convertMetastoreOrc=true; set spark.sql.orc.impl=native; both 
flags value are spark3.0's default value

Then I dig into spark code and find this:  
[https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L149
 
|https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L149]

It looks like read all columns for old hive schema (which has no field names) 
is by design for spark3.0

In my company data, some table schema is old hive, while some table schema is 
new hive. The performance of query reading old hive table decreases a lot when 
I enable native reader in spark3.0. This is main block for us to switch hive 
reader to native reader in spark3.0. 

My questions is:

#1 Do you have plan to support column pruning for old hive schema in native orc 
reader?

#2 If question #1's answer is No. Is there some potential issue if code is 
fixed to support column pruning?

 

  was:
I asked this question before, but perhaps I did not addressed question clearly, 
so I did not get answer. This time I will show an example to illustrate this 
question clearly.
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 
val spark = SparkSession.builder().appName("OrcTest").getOrCreate()
var inputBytes = 0L
spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { 
    val metrics = taskEnd.taskMetrics
    inputBytes += metrics.inputMetrics.bytesRead 
  } 
}) 
spark.sql("create table orc_table_old_schema (_col0 int, _col1 string, _col2 
double) STORED AS ORC;")
spark.sql("insert overwrite table orc_table_old_schema select 1, 'name1', 
1000.05")
inputBytes = 0L
spark.sql("select _col2 from orc_table_old_schema").show()
print("input bytes for old schema table: " + inputBytes) // print 1655 

spark.sql("create table orc_table_new_schema (id int, name string, value 
double) STORED AS ORC;")
spark.sql("insert overwrite table orc_table_new_schema select 1, 'name1', 
1000.05")
inputBytes = 0L
spark.sql("select value from orc_table_new_schema").show()
print("input bytes for new schema table: " + inputBytes) // print 1641
{code}
In this example, I create orc table orc_table_old_schema, which schema has no 
field name and is written before HIVE-4243, to trigger this issue. You can see 
that input bytes for table orc_table_old_schema is 14 bytes more than table 
orc_table_new_schema.

The reason is that spark3.0 default use native reader rather than hive reader 
to read orc table, and native reader read all columns for old hive schema table 
and read only pruning columns (in this example, only column 'value' is read) 
for new hive schema table.

The following flags enable native reader: set 
spark.sql.hive.convertMetastoreOrc=true; set spark.sql.orc.impl=native; both 
flags value are spark3.0's default value

Then I dig into spark code and find this:  
[https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L149
 
|https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L149]

It looks like read all columns for old hive schema (which has no field names) 
is by design for spark3.0

In my company data, some table schema is old hive, while some table schema is 
new hive. The performance of query reading old hive table decrease a lot when I 
enable native reader in spark3.0. This is main block for us to switch hive 
reader to native reader in spark3.0. 

My questions is:

#1 Do you have plan to support column pruning for old hive schema in native orc 
reader?

#2 If question #1's answer is No. Is there some potential issue if code is 
fixed to support column pruning?

 


> old hive table's all columns are read when column pruning applies in spark3.0
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-35274
>                 URL: https://issues.apache.org/jira/browse/SPARK-35274
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 3.0.0
>         Environment: spark3.0
>            Reporter: xiaoli
>            Priority: Major
>
> I asked this question 
> [before|hhttps://issues.apache.org/jira/browse/SPARK-35190], but perhaps I 
> did not addressed question clearly, so I did not get answer. This time I will 
> show an example to illustrate this question clearly.
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 
> val spark = SparkSession.builder().appName("OrcTest").getOrCreate()
> var inputBytes = 0L
> spark.sparkContext.addSparkListener(new SparkListener() {
>   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { 
>     val metrics = taskEnd.taskMetrics
>     inputBytes += metrics.inputMetrics.bytesRead 
>   } 
> }) 
> spark.sql("create table orc_table_old_schema (_col0 int, _col1 string, _col2 
> double) STORED AS ORC;")
> spark.sql("insert overwrite table orc_table_old_schema select 1, 'name1', 
> 1000.05")
> inputBytes = 0L
> spark.sql("select _col2 from orc_table_old_schema").show()
> print("input bytes for old schema table: " + inputBytes) // print 1655 
> spark.sql("create table orc_table_new_schema (id int, name string, value 
> double) STORED AS ORC;")
> spark.sql("insert overwrite table orc_table_new_schema select 1, 'name1', 
> 1000.05")
> inputBytes = 0L
> spark.sql("select value from orc_table_new_schema").show()
> print("input bytes for new schema table: " + inputBytes) // print 1641
> {code}
> This example is run on spark3.0 with default flags. In this example, I create 
> orc table orc_table_old_schema, which schema has no field name and is written 
> before HIVE-4243, to trigger this issue. You can see that input bytes for 
> table orc_table_old_schema is 14 bytes more than table orc_table_new_schema.
> The reason is that spark3.0 default use native reader rather than hive reader 
> to read orc table, and native reader read all columns for old hive schema 
> table and read only pruning columns (in this example, only column 'value' is 
> read) for new hive schema table.
> The following flags enable native reader: set 
> spark.sql.hive.convertMetastoreOrc=true; set spark.sql.orc.impl=native; both 
> flags value are spark3.0's default value
> Then I dig into spark code and find this:  
> [https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L149
>  
> |https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala#L149]
> It looks like read all columns for old hive schema (which has no field names) 
> is by design for spark3.0
> In my company data, some table schema is old hive, while some table schema is 
> new hive. The performance of query reading old hive table decreases a lot 
> when I enable native reader in spark3.0. This is main block for us to switch 
> hive reader to native reader in spark3.0. 
> My questions is:
> #1 Do you have plan to support column pruning for old hive schema in native 
> orc reader?
> #2 If question #1's answer is No. Is there some potential issue if code is 
> fixed to support column pruning?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to