[
https://issues.apache.org/jira/browse/HUDI-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17952298#comment-17952298
]
Davis Zhang commented on HUDI-9420:
-----------------------------------
It turns out that the column come with SI and RLI because they are different
UUID while the text field is reusing the same thing over all the rows, parquet
compression ration is very different. In the textField case, the value actually
transferred over the network/read from the disk is much lower than the UUID
case due to compression
> Spark sql select some column is significantly slower than some others
> ---------------------------------------------------------------------
>
> Key: HUDI-9420
> URL: https://issues.apache.org/jira/browse/HUDI-9420
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Davis Zhang
> Priority: Major
>
> {code:java}
> scala>
> scala> spark.sql("set
> spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
> res310: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val startTime = System.currentTimeMillis()
> startTime: Long = 1747436173549
> scala>
> scala> spark.sql("""
> | SELECT h.secKey
> | FROM hudi h
> | JOIN parquet_table p
> | ON h.key = p.key
> | and p.partition="2025-01-21"
> | limit 10
> | """).collect()
> res311: Array[org.apache.spark.sql.Row] =
> Array([000-108b5602-3e05-45f6-95ae-eb8bbc714b80],
> [000-74d77871-1295-4763-90b1-101b4d5edea2],
> [000-a7ad8185-5487-4376-b132-80de4637204d],
> [000-6c787dc5-a040-449c-9ffc-4835d0cea95a],
> [000-901fc912-ce18-41d2-b868-550e196e5608],
> [000-6931bbb2-7cea-42bb-be43-863570324bec],
> [000-c43ecf5c-d9d1-47d8-8543-aeded4b23866],
> [000-032b3c28-49b1-4bd4-bdd3-f665686728bd],
> [000-30fddb94-02a5-4727-bbd7-cccf72c6023e],
> [000-2483dab1-3ea8-4b33-8607-079f94d1fe32])
> scala>
> scala> // Calculate and print duration
> scala> val endTime = System.currentTimeMillis()
> endTime: Long = 1747436281181
> scala> val durationInSeconds = (endTime - startTime) / 1000.0
> durationInSeconds: Double = 107.632
> scala> println(s"Query execution time: $durationInSeconds seconds")
> Query execution time: 107.632 seconds
> scala>
> scala>
> scala> spark.sql("set
> spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
> res313: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val startTime = System.currentTimeMillis()
> startTime: Long = 1747436281326
> scala>
> scala> spark.sql("""
> | SELECT h.textField1
> | FROM hudi h
> | JOIN parquet_table p
> | ON h.key = p.key
> | and p.partition="2025-01-21"
> | limit 10
> | """).collect()
> res314: Array[org.apache.spark.sql.Row] =
> Array([abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
>
> [abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
>
> [abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
> [abcdefghij...
> scala>
> scala> // Calculate and print duration
> scala> val endTime = System.currentTimeMillis()
> endTime: Long = 1747436327443
> scala> val durationInSeconds = (endTime - startTime) / 1000.0
> durationInSeconds: Double = 46.117
> scala> println(s"Query execution time: $durationInSeconds seconds")
> Query execution time: 46.117 seconds
> {code}
> Table schema
> {code:java}
> { "name" : "key", ←------- Record key with RLI enabled (OH hudi case)
> "type" : [ "null", "string" ], "default" : null },
> { "name" : "secKey", ←---- column with secondary index "type" : [
> "null", "string" ], "default" : null },
> { "name" : "date", "type" : [ "null", "string" ], "default" : null
> },
> { "name" : "intField", "type" : [ "null", "long" ], "default" : null
> },
> { "name" : "city", "type" : [ "null", "string" ], "default" : null
> },
> { "name" : "textField1", ←----- 256 KB before compression "type" : [
> "null", "string" ], "default" : null },
> { "name" : "textField2",←----- 256 KB before compression "type" : [
> "null", "string" ], "default" : null },
> { "name" : "textField3",←----- 512 KB before compression "type" : [
> "null", "string" ], "default" : null },
> { "name" : "textField4",←----- 1024 KB/1 MB before compression "type" :
> [ "null", "string" ], "default" : null },
> { "name" : "decimalField", "type" : [ "null", "float" ], "default" :
> null },
> { "name" : "longField", "type" : [ "null", "long" ], "default" :
> null },
> { "name" : "partition", ←---------- used as partition column "type" : [
> "null", "int" ], "default" : null },
> { "name" : "incrLongField", "type" : [ "null", "long" ], "default" :
> null } ]} {code}
> * COW table created with hudi 1.x
> * *10 partitions, per partition 10GB* worth of data, *total size is 100GB*
> * The fan out from secIdx -> record_key would be 1 as they are
> *independently generated UUID.*
> * Per record size before compression is around 2 MB.
> * Per index file group size on disk is fined tuned to be 55-60 MB.
> * Per data block size on disk is 128 MB.
>
> Spark OSS 3.5.2. Hudi 1.0.2
>
> hardware - EMR, m7a.4xlarge, 6 executor node cluster. 40 GB memory
--
This message was sent by Atlassian Jira
(v8.20.10#820010)