I modify the tech query5 to DataFrame:
val forders = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders 
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders>”).filter("o_orderdate
 < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
 <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem>")
val fcustomer = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
 <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer>")
val fsupplier = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
 <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier>")
val fregion = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region 
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region>”).where("r_name = 
'ASIA'").select($"r_regionkey")
val fnation = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation 
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation>”)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
     .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
     .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" 
=== fsupplier("s_nationkey"))
     .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
     .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
     .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
     .groupBy($"n_name")
     .agg(sum($"value").as("revenue"))
     .sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each 
with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using 
spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, 
spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He

Reply via email to