[ https://issues.apache.org/jira/browse/SPARK-22207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kanika dhuria reopened SPARK-22207: ----------------------------------- Same issue is seen in spark 2.4 > High memory usage when converting relational data to Hierarchical data > ---------------------------------------------------------------------- > > Key: SPARK-22207 > URL: https://issues.apache.org/jira/browse/SPARK-22207 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.0 > Reporter: kanika dhuria > Priority: Major > Labels: bulk-closed > > Have 4 tables > lineitems ~1.4Gb, > orders ~ 330MB > customer ~47MB > nations ~ 2.2K > These tables are related as follows > There are multiple lineitems per order (pk, fk:orderkey) > There are multiple orders per customer(pk,fk: cust_key) > There are multiple customers per nation(pk, fk:nation key) > Data is almost evenly distributed. > Building hierarchy till 3 levels i.e joining lineitems, orders, customers > works good with executor memory 4Gb/2cores > Adding nations require 8GB/2 cores or 4GB/1 core memory. > ============================================================== > {noformat} > val sqlContext = SparkSession.builder() .enableHiveSupport() > .config("spark.sql.retainGroupColumns", false) > .config("spark.sql.crossJoin.enabled", true) .getOrCreate() > > val orders = sqlContext.sql("select * from orders") > val lineItem = sqlContext.sql("select * from lineitems") > > val customer = sqlContext.sql("select * from customers") > > val nation = sqlContext.sql("select * from nations") > > val lineitemOrders = > lineItem.groupBy(col("l_orderkey")).agg(col("l_orderkey"), > collect_list(struct(col("l_partkey"), > col("l_suppkey"),col("l_linenumber"),col("l_quantity"),col("l_extendedprice"),col("l_discount"),col("l_tax"),col("l_returnflag"),col("l_linestatus"),col("l_shipdate"),col("l_commitdate"),col("l_receiptdate"),col("l_shipinstruct"),col("l_shipmode"))).as("lineitem")).join(orders, > orders("O_ORDERKEY")=== lineItem("l_orderkey")).select(col("O_ORDERKEY"), > col("O_CUSTKEY"), col("O_ORDERSTATUS"), col("O_TOTALPRICE"), > col("O_ORDERDATE"), col("O_ORDERPRIORITY"), col("O_CLERK"), > col("O_SHIPPRIORITY"), col("O_COMMENT"), col("lineitem")) > > val customerList = > lineitemOrders.groupBy(col("o_custkey")).agg(col("o_custkey"),collect_list(struct(col("O_ORDERKEY"), > col("O_CUSTKEY"), col("O_ORDERSTATUS"), col("O_TOTALPRICE"), > col("O_ORDERDATE"), col("O_ORDERPRIORITY"), col("O_CLERK"), > col("O_SHIPPRIORITY"), > col("O_COMMENT"),col("lineitem"))).as("items")).join(customer,customer("c_custkey")=== > > lineitemOrders("o_custkey")).select(col("c_custkey"),col("c_name"),col("c_nationkey"),col("items")) > val nationList = > customerList.groupBy(col("c_nationkey")).agg(col("c_nationkey"),collect_list(struct(col("c_custkey"),col("c_name"),col("c_nationkey"),col("items"))).as("custList")).join(nation,nation("n_nationkey")===customerList("c_nationkey")).select(col("n_nationkey"),col("n_name"),col("custList")) > > nationList.write.mode("overwrite").json("filePath") > {noformat} > ======================================================== > If the customeList is saved in a file and then the last agg/join is run > separately, it does run fine in 4GB/2 core . > I can provide the data if needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org