kanika dhuria created SPARK-22207:
-------------------------------------

             Summary: High memory usage when converting relational data to 
Hierarchical data
                 Key: SPARK-22207
                 URL: https://issues.apache.org/jira/browse/SPARK-22207
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.1.0
            Reporter: kanika dhuria


Have 4 tables 
lineitems ~1.4Gb,
orders ~ 330MB
customer ~47MB
nations ~ 2.2K

These tables are related as follows
There are multiple lineitems per order (pk, fk:orderkey)
There are multiple orders per customer(pk,fk: cust_key)
There are multiple customers per nation(pk, fk:nation key)

Data is almost evenly distributed.

Building hierarchy till 3 levels i.e joining lineitems, orders, customers works 
good with executor memory 4Gb/2cores
Adding nations require 8GB/2 cores or 4GB/1 core memory.

==============================================================

{noformat}
val sqlContext = SparkSession.builder() .enableHiveSupport() 
.config("spark.sql.retainGroupColumns", false) 
.config("spark.sql.crossJoin.enabled", true) .getOrCreate()
 
  val orders = sqlContext.sql("select * from orders")
  val lineItem = sqlContext.sql("select * from lineitems")
  
  val customer = sqlContext.sql("select * from customers")
  
  val nation = sqlContext.sql("select * from nations")
  
  val lineitemOrders = 
lineItem.groupBy(col("l_orderkey")).agg(col("l_orderkey"), 
collect_list(struct(col("l_partkey"), 
col("l_suppkey"),col("l_linenumber"),col("l_quantity"),col("l_extendedprice"),col("l_discount"),col("l_tax"),col("l_returnflag"),col("l_linestatus"),col("l_shipdate"),col("l_commitdate"),col("l_receiptdate"),col("l_shipinstruct"),col("l_shipmode"))).as("lineitem")).join(orders,
 orders("O_ORDERKEY")=== lineItem("l_orderkey")).select(col("O_ORDERKEY"), 
col("O_CUSTKEY"),  col("O_ORDERSTATUS"), col("O_TOTALPRICE"), 
col("O_ORDERDATE"), col("O_ORDERPRIORITY"), col("O_CLERK"), 
col("O_SHIPPRIORITY"), col("O_COMMENT"),  col("lineitem"))  
  
  val customerList = 
lineitemOrders.groupBy(col("o_custkey")).agg(col("o_custkey"),collect_list(struct(col("O_ORDERKEY"),
 col("O_CUSTKEY"),  col("O_ORDERSTATUS"), col("O_TOTALPRICE"), 
col("O_ORDERDATE"), col("O_ORDERPRIORITY"), col("O_CLERK"), 
col("O_SHIPPRIORITY"), 
col("O_COMMENT"),col("lineitem"))).as("items")).join(customer,customer("c_custkey")===
 
lineitemOrders("o_custkey")).select(col("c_custkey"),col("c_name"),col("c_nationkey"),col("items"))


 val nationList = 
customerList.groupBy(col("c_nationkey")).agg(col("c_nationkey"),collect_list(struct(col("c_custkey"),col("c_name"),col("c_nationkey"),col("items"))).as("custList")).join(nation,nation("n_nationkey")===customerList("c_nationkey")).select(col("n_nationkey"),col("n_name"),col("custList"))
 
  nationList.write.mode("overwrite").json("filePath")

{noformat}

========================================================
If the customeList is saved in a file and then the last agg/join is run 
separately, it does run fine in 4GB/2 core .

I can provide the data if needed.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to