[ 
https://issues.apache.org/jira/browse/HUDI-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davis Zhang updated HUDI-9420:
------------------------------
    Description: 
{code:java}
scala> 


scala> spark.sql("set 
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res310: org.apache.spark.sql.DataFrame = [key: string, value: string]


scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436173549


scala> 


scala> spark.sql("""
     |   SELECT h.secKey
     |   FROM hudi h
     |   JOIN parquet_table p
     |   ON h.key = p.key
     |   and p.partition="2025-01-21"
     |   limit 10
     | """).collect()
res311: Array[org.apache.spark.sql.Row] = 
Array([000-108b5602-3e05-45f6-95ae-eb8bbc714b80], 
[000-74d77871-1295-4763-90b1-101b4d5edea2], 
[000-a7ad8185-5487-4376-b132-80de4637204d], 
[000-6c787dc5-a040-449c-9ffc-4835d0cea95a], 
[000-901fc912-ce18-41d2-b868-550e196e5608], 
[000-6931bbb2-7cea-42bb-be43-863570324bec], 
[000-c43ecf5c-d9d1-47d8-8543-aeded4b23866], 
[000-032b3c28-49b1-4bd4-bdd3-f665686728bd], 
[000-30fddb94-02a5-4727-bbd7-cccf72c6023e], 
[000-2483dab1-3ea8-4b33-8607-079f94d1fe32])


scala> 


scala> // Calculate and print duration


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436281181


scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 107.632


scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 107.632 seconds


scala> 


scala> 


scala> spark.sql("set 
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res313: org.apache.spark.sql.DataFrame = [key: string, value: string]


scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436281326


scala> 


scala> spark.sql("""
     |   SELECT h.textField1
     |   FROM hudi h
     |   JOIN parquet_table p
     |   ON h.key = p.key
     |   and p.partition="2025-01-21"
     |   limit 10
     | """).collect()
res314: Array[org.apache.spark.sql.Row] = 
Array([abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 [abcdefghij...


scala> 


scala> // Calculate and print duration


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436327443


scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 46.117


scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 46.117 seconds

 {code}
Table schema
{code:java}
{    "name" : "key", ←------- Record key with RLI enabled (OH hudi case)    
"type" : [ "null", "string" ],    "default" : null  }, {    "name" : "secKey", 
←---- column with secondary index   "type" : [ "null", "string" ],    "default" 
: null  }, {    "name" : "date",    "type" : [ "null", "string" ],    "default" 
: null  }, {    "name" : "intField",    "type" : [ "null", "long" ],    
"default" : null  }, {    "name" : "city",    "type" : [ "null", "string" ],    
"default" : null  }, {    "name" : "textField1", ←----- 256 KB before 
compression    "type" : [ "null", "string" ],    "default" : null  }, {    
"name" : "textField2",←----- 256 KB before compression    "type" : [ "null", 
"string" ],    "default" : null  }, {    "name" : "textField3",←----- 512 KB 
before compression    "type" : [ "null", "string" ],    "default" : null  }, {  
  "name" : "textField4",←----- 1024 KB/1 MB before compression    "type" : [ 
"null", "string" ],    "default" : null  }, {    "name" : "decimalField",    
"type" : [ "null", "float" ],    "default" : null  }, {    "name" : 
"longField",    "type" : [ "null", "long" ],    "default" : null  }, {    
"name" : "partition", ←---------- used as partition column    "type" : [ 
"null", "int" ],    "default" : null  }, {    "name" : "incrLongField",    
"type" : [ "null", "long" ],    "default" : null  } ]} {code}
 * COW table created with hudi 1.x
 * *10 partitions, per partition 10GB* worth of data, *total size is 100GB*
 * The fan out from secIdx -> record_key would be 1 as they are independently 
generated UUID.
 * Per record size before compression is around 2 MB.
 * Per index file group size on disk is fined tuned to be 55-60 MB.
 * Per data block size on disk is 128 MB.

same thing happens if I select record key column, it takes significantly longer 
time than normal data columns even though both are of string type and size wise 
the record key/secKey is merely 64bytes UUID while the string data column is 
256KB.

 

Spark 3.5.2. Hudi 1.0.2

  was:
{code:java}
scala> 


scala> spark.sql("set 
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res310: org.apache.spark.sql.DataFrame = [key: string, value: string]


scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436173549


scala> 


scala> spark.sql("""
     |   SELECT h.secKey
     |   FROM hudi h
     |   JOIN parquet_table p
     |   ON h.key = p.key
     |   and p.partition="2025-01-21"
     |   limit 10
     | """).collect()
res311: Array[org.apache.spark.sql.Row] = 
Array([000-108b5602-3e05-45f6-95ae-eb8bbc714b80], 
[000-74d77871-1295-4763-90b1-101b4d5edea2], 
[000-a7ad8185-5487-4376-b132-80de4637204d], 
[000-6c787dc5-a040-449c-9ffc-4835d0cea95a], 
[000-901fc912-ce18-41d2-b868-550e196e5608], 
[000-6931bbb2-7cea-42bb-be43-863570324bec], 
[000-c43ecf5c-d9d1-47d8-8543-aeded4b23866], 
[000-032b3c28-49b1-4bd4-bdd3-f665686728bd], 
[000-30fddb94-02a5-4727-bbd7-cccf72c6023e], 
[000-2483dab1-3ea8-4b33-8607-079f94d1fe32])


scala> 


scala> // Calculate and print duration


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436281181


scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 107.632


scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 107.632 seconds


scala> 


scala> 


scala> spark.sql("set 
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res313: org.apache.spark.sql.DataFrame = [key: string, value: string]


scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436281326


scala> 


scala> spark.sql("""
     |   SELECT h.textField1
     |   FROM hudi h
     |   JOIN parquet_table p
     |   ON h.key = p.key
     |   and p.partition="2025-01-21"
     |   limit 10
     | """).collect()
res314: Array[org.apache.spark.sql.Row] = 
Array([abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 [abcdefghij...


scala> 


scala> // Calculate and print duration


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436327443


scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 46.117


scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 46.117 seconds

 {code}
Table schema
{code:java}
{    "name" : "key", ←------- Record key with RLI enabled (OH hudi case)    
"type" : [ "null", "string" ],    "default" : null  }, 
{    "name" : "secKey", ←---- column with secondary index   "type" : [ "null", 
"string" ],    "default" : null  }, 
{    "name" : "date",    "type" : [ "null", "string" ],    "default" : null  }, 
{    "name" : "intField",    "type" : [ "null", "long" ],    "default" : null  
}, 
{    "name" : "city",    "type" : [ "null", "string" ],    "default" : null  }, 
{    "name" : "textField1", ←----- 256 KB before compression    "type" : [ 
"null", "string" ],    "default" : null  }, 
{    "name" : "textField2",←----- 256 KB before compression    "type" : [ 
"null", "string" ],    "default" : null  }, 
{    "name" : "textField3",←----- 512 KB before compression    "type" : [ 
"null", "string" ],    "default" : null  }, 
{    "name" : "textField4",←----- 1024 KB/1 MB before compression    "type" : [ 
"null", "string" ],    "default" : null  }, 
{    "name" : "decimalField",    "type" : [ "null", "float" ],    "default" : 
null  }, 
{    "name" : "longField",    "type" : [ "null", "long" ],    "default" : null  
}, 
{    "name" : "partition", ←---------- used as partition column    "type" : [ 
"null", "int" ],    "default" : null  }, 
{    "name" : "incrLongField",    "type" : [ "null", "long" ],    "default" : 
null  } ]} {code}
 * COW table created with hudi 1.x
 * *10 partitions, per partition 10GB* worth of data, *total size is 100GB*
 * The fan out from secIdx -> record_key would be 1 as they are *independently 
generated UUID.*
 * Per record size before compression is around 2 MB.
 * Per index file group size on disk is fined tuned to be 55-60 MB.
 * Per data block size on disk is 128 MB.

 

Spark OSS 3.5.2. Hudi 1.0.2


> Spark sql select some column is significantly slower than some others
> ---------------------------------------------------------------------
>
>                 Key: HUDI-9420
>                 URL: https://issues.apache.org/jira/browse/HUDI-9420
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Davis Zhang
>            Priority: Major
>
> {code:java}
> scala> 
> scala> spark.sql("set 
> spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
> res310: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val startTime = System.currentTimeMillis()
> startTime: Long = 1747436173549
> scala> 
> scala> spark.sql("""
>      |   SELECT h.secKey
>      |   FROM hudi h
>      |   JOIN parquet_table p
>      |   ON h.key = p.key
>      |   and p.partition="2025-01-21"
>      |   limit 10
>      | """).collect()
> res311: Array[org.apache.spark.sql.Row] = 
> Array([000-108b5602-3e05-45f6-95ae-eb8bbc714b80], 
> [000-74d77871-1295-4763-90b1-101b4d5edea2], 
> [000-a7ad8185-5487-4376-b132-80de4637204d], 
> [000-6c787dc5-a040-449c-9ffc-4835d0cea95a], 
> [000-901fc912-ce18-41d2-b868-550e196e5608], 
> [000-6931bbb2-7cea-42bb-be43-863570324bec], 
> [000-c43ecf5c-d9d1-47d8-8543-aeded4b23866], 
> [000-032b3c28-49b1-4bd4-bdd3-f665686728bd], 
> [000-30fddb94-02a5-4727-bbd7-cccf72c6023e], 
> [000-2483dab1-3ea8-4b33-8607-079f94d1fe32])
> scala> 
> scala> // Calculate and print duration
> scala> val endTime = System.currentTimeMillis()
> endTime: Long = 1747436281181
> scala> val durationInSeconds = (endTime - startTime) / 1000.0
> durationInSeconds: Double = 107.632
> scala> println(s"Query execution time: $durationInSeconds seconds")
> Query execution time: 107.632 seconds
> scala> 
> scala> 
> scala> spark.sql("set 
> spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
> res313: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val startTime = System.currentTimeMillis()
> startTime: Long = 1747436281326
> scala> 
> scala> spark.sql("""
>      |   SELECT h.textField1
>      |   FROM hudi h
>      |   JOIN parquet_table p
>      |   ON h.key = p.key
>      |   and p.partition="2025-01-21"
>      |   limit 10
>      | """).collect()
> res314: Array[org.apache.spark.sql.Row] = 
> Array([abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
>  
> [abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
>  
> [abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
>  [abcdefghij...
> scala> 
> scala> // Calculate and print duration
> scala> val endTime = System.currentTimeMillis()
> endTime: Long = 1747436327443
> scala> val durationInSeconds = (endTime - startTime) / 1000.0
> durationInSeconds: Double = 46.117
> scala> println(s"Query execution time: $durationInSeconds seconds")
> Query execution time: 46.117 seconds
>  {code}
> Table schema
> {code:java}
> {    "name" : "key", ←------- Record key with RLI enabled (OH hudi case)    
> "type" : [ "null", "string" ],    "default" : null  }, {    "name" : 
> "secKey", ←---- column with secondary index   "type" : [ "null", "string" ],  
>   "default" : null  }, {    "name" : "date",    "type" : [ "null", "string" 
> ],    "default" : null  }, {    "name" : "intField",    "type" : [ "null", 
> "long" ],    "default" : null  }, {    "name" : "city",    "type" : [ "null", 
> "string" ],    "default" : null  }, {    "name" : "textField1", ←----- 256 KB 
> before compression    "type" : [ "null", "string" ],    "default" : null  }, 
> {    "name" : "textField2",←----- 256 KB before compression    "type" : [ 
> "null", "string" ],    "default" : null  }, {    "name" : "textField3",←----- 
> 512 KB before compression    "type" : [ "null", "string" ],    "default" : 
> null  }, {    "name" : "textField4",←----- 1024 KB/1 MB before compression    
> "type" : [ "null", "string" ],    "default" : null  }, {    "name" : 
> "decimalField",    "type" : [ "null", "float" ],    "default" : null  }, {    
> "name" : "longField",    "type" : [ "null", "long" ],    "default" : null  }, 
> {    "name" : "partition", ←---------- used as partition column    "type" : [ 
> "null", "int" ],    "default" : null  }, {    "name" : "incrLongField",    
> "type" : [ "null", "long" ],    "default" : null  } ]} {code}
>  * COW table created with hudi 1.x
>  * *10 partitions, per partition 10GB* worth of data, *total size is 100GB*
>  * The fan out from secIdx -> record_key would be 1 as they are independently 
> generated UUID.
>  * Per record size before compression is around 2 MB.
>  * Per index file group size on disk is fined tuned to be 55-60 MB.
>  * Per data block size on disk is 128 MB.
> same thing happens if I select record key column, it takes significantly 
> longer time than normal data columns even though both are of string type and 
> size wise the record key/secKey is merely 64bytes UUID while the string data 
> column is 256KB.
>  
> Spark 3.5.2. Hudi 1.0.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to