[ 
https://issues.apache.org/jira/browse/SPARK-24316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bimalendu Choudhary updated SPARK-24316:
----------------------------------------
    Description: 
When we create a table from a data frame using spark sql with columns around 6k 
or more, even simple queries of fetching 70k rows takes 20 minutes, while the 
same table if we create through Hive with same data , the same query just takes 
5 minutes.

 

Instrumenting the code we see that the executors are looping in the while loop 
of the function initializeInternal(). The majority of time is getting spent 
here and the executor seems to be stalled for long time .

 

 
{code:java|title=spark/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java|borderStyle=solid}


private void initializeInternal() ..
 ..
 for (int i = 0; i < requestedSchema.getFieldCount(); ++i)

{ ... }

}
{code:java}
 {code}
 

When spark sql is creating table, it also stores the metadata in the 
TBLPROPERTIES in json format. We see that if we remove this metadata from the 
table the queries become fast , which is the case when we create the same table 
through Hive. The exact same table takes 5 times more time with the Json meta 
data as compared to without the json metadata.

 

So looks like as the number of columns are growing bigger than 5 to 6k, the 
processing of the metadata and comparing it becomes more and more expensive and 
the performance degrades drastically.

To recreate the problem simply run the following query:

import org.apache.spark.sql.SparkSession

val resp_data = spark.sql("SELECT * FROM duplicatefgv limit 70000")

 resp_data.write.format("csv").save("/tmp/filename")

 

The table should be created by spark sql from dataframe so that the Json meta 
data is stored. For ex:-

val dff =  spark.read.format("csv").load("hdfs:///tmp/test.csv")

dff.createOrReplaceTempView("my_temp_table")

 val tmp = spark.sql("Create table tableName stored as parquet as select * from 
my_temp_table")

 

 

from pyspark.sql import SQL

Context 
 sqlContext = SQLContext(sc) 
 resp_data = spark.sql( " select * from test").limit(2000) 
 print resp_data_fgv_1k.count() 
 (resp_data_fgv_1k.write.option('header', 
False).mode('overwrite').csv('/tmp/2.csv') ) 

 

 

The performance seems to be even slow in the loop if the schema does not match 
or the fields are empty and the code goes into the if condition where the 
missing column is marked true:

missingColumns[i] = true;

 

  was:
When we create a table from a data frame using spark sql with columns around 6k 
or more, even simple queries of fetching 70k rows takes 20 minutes, while the 
same table if we create through Hive with same data , the same query just takes 
5 minutes.

 

Instrumenting the code we see that the executors are looping in the while loop 
of the function initializeInternal(). The majority of time is getting spent 
here and the executor seems to be stalled for long time .

 

{{{code:title=spark/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java|borderStyle=solid}}}

private void initializeInternal() ..
 ..
 for (int i = 0; i < requestedSchema.getFieldCount(); ++i)

{ ... }

}
{code:java}
 {code}
 

When spark sql is creating table, it also stores the metadata in the 
TBLPROPERTIES in json format. We see that if we remove this metadata from the 
table the queries become fast , which is the case when we create the same table 
through Hive. The exact same table takes 5 times more time with the Json meta 
data as compared to without the json metadata.

 

So looks like as the number of columns are growing bigger than 5 to 6k, the 
processing of the metadata and comparing it becomes more and more expensive and 
the performance degrades drastically.

To recreate the problem simply run the following query:

import org.apache.spark.sql.SparkSession

val resp_data = spark.sql("SELECT * FROM duplicatefgv limit 70000")

 resp_data.write.format("csv").save("/tmp/filename")

 

The table should be created by spark sql from dataframe so that the Json meta 
data is stored. For ex:-

val dff =  spark.read.format("csv").load("hdfs:///tmp/test.csv")

dff.createOrReplaceTempView("my_temp_table")

 val tmp = spark.sql("Create table tableName stored as parquet as select * from 
my_temp_table")

 

 

from pyspark.sql import SQL

Context 
 sqlContext = SQLContext(sc) 
 resp_data = spark.sql( " select * from test").limit(2000) 
 print resp_data_fgv_1k.count() 
 (resp_data_fgv_1k.write.option('header', 
False).mode('overwrite').csv('/tmp/2.csv') ) 

 

 

The performance seems to be even slow in the loop if the schema does not match 
or the fields are empty and the code goes into the if condition where the 
missing column is marked true:

missingColumns[i] = true;

 


> Spark sql queries stall for  column width more 6k for parquet based table
> -------------------------------------------------------------------------
>
>                 Key: SPARK-24316
>                 URL: https://issues.apache.org/jira/browse/SPARK-24316
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.4.0
>            Reporter: Bimalendu Choudhary
>            Priority: Major
>
> When we create a table from a data frame using spark sql with columns around 
> 6k or more, even simple queries of fetching 70k rows takes 20 minutes, while 
> the same table if we create through Hive with same data , the same query just 
> takes 5 minutes.
>  
> Instrumenting the code we see that the executors are looping in the while 
> loop of the function initializeInternal(). The majority of time is getting 
> spent here and the executor seems to be stalled for long time .
>  
>  
> {code:java|title=spark/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java|borderStyle=solid}
> private void initializeInternal() ..
>  ..
>  for (int i = 0; i < requestedSchema.getFieldCount(); ++i)
> { ... }
> }
> {code:java}
>  {code}
>  
> When spark sql is creating table, it also stores the metadata in the 
> TBLPROPERTIES in json format. We see that if we remove this metadata from the 
> table the queries become fast , which is the case when we create the same 
> table through Hive. The exact same table takes 5 times more time with the 
> Json meta data as compared to without the json metadata.
>  
> So looks like as the number of columns are growing bigger than 5 to 6k, the 
> processing of the metadata and comparing it becomes more and more expensive 
> and the performance degrades drastically.
> To recreate the problem simply run the following query:
> import org.apache.spark.sql.SparkSession
> val resp_data = spark.sql("SELECT * FROM duplicatefgv limit 70000")
>  resp_data.write.format("csv").save("/tmp/filename")
>  
> The table should be created by spark sql from dataframe so that the Json meta 
> data is stored. For ex:-
> val dff =  spark.read.format("csv").load("hdfs:///tmp/test.csv")
> dff.createOrReplaceTempView("my_temp_table")
>  val tmp = spark.sql("Create table tableName stored as parquet as select * 
> from my_temp_table")
>  
>  
> from pyspark.sql import SQL
> Context 
>  sqlContext = SQLContext(sc) 
>  resp_data = spark.sql( " select * from test").limit(2000) 
>  print resp_data_fgv_1k.count() 
>  (resp_data_fgv_1k.write.option('header', 
> False).mode('overwrite').csv('/tmp/2.csv') ) 
>  
>  
> The performance seems to be even slow in the loop if the schema does not 
> match or the fields are empty and the code goes into the if condition where 
> the missing column is marked true:
> missingColumns[i] = true;
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to