wangxiaoying opened a new issue, #423:
URL: https://github.com/apache/incubator-wayang/issues/423

   ### Description
   
   I'm trying to run TPC-H Q3 and compare the performance between Wayang and 
SparkSQL under the following setup:
   
   * Running both Spark (3.5.1) and Wayang on a local VM with 32 CPU cores and 
128GB memory
   * Running a postgres instance that maintains all the TPC-H tables (sf=10) on 
a remote VM
   
   I try to keep the spark setting the same on both runs. And for Q3 wayang 
took around 3 minutes while spark took only 40 seconds.
   
   ### To reproduce
   
   To run Wayang, I compile the project locally (using **tag 1.0.0**) and use 
the benchmark code under `wayang-benchmark` directly: 
`./wayang-1.0.0-SNAPSHOT/bin/wayang-submit org.apache.wayang.apps.tpch.TpcH 
exp\(123\) spark,postgres file:///path/to/wayang.properties Q3`
   
   The wayang.properties file is like the following:
   ```
   wayang.postgres.jdbc.url = 
jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}
   wayang.postgres.jdbc.user = {POSTGRES_USER}
   wayang.postgres.jdbc.password = {POSTGRES_PASSWORD}
   
   spark.master = local[32]
   spark.driver.memory = 110G
   spark.executor.memory = 110G
   spark.executor.cores = 32
   wayang.giraph.hdfs.tempdir = file:///tmp/result/
   
   spark.rdd.compress = true
   spark.log.level = INFO
   ```
   
   To run Spark, I use the following code:
   ```python
   import sys
   import time
   from pyspark.sql import SparkSession
   from contexttimer import Timer
   
   SPARK_JARS = "path/to/jar/postgresql-42.3.8.jar"
   POSTGRES_URL = "jdbc:postgresql://{POSTGRES_IP}:{POSTGRES_PORT}/{TPCH_DB}"
   POSTGRES_USER = "{POSTGRES_USER}"
   POSTGRES_PASSWORD = "{POSTGRES_PASSWORD}"
   
   TPCH_Q3 = """SELECT
       l_orderkey,
       sum(l_extendedprice * (1 - l_discount)) AS revenue,
       o_orderdate,
       o_shippriority
   FROM
       customer,
       orders,
       lineitem
   WHERE
       c_mktsegment = 'BUILDING'
       AND c_custkey = o_custkey
       AND l_orderkey = o_orderkey
       AND o_orderdate < CAST('1995-03-15' AS date)
       AND l_shipdate > CAST('1995-03-15' AS date)
   GROUP BY
       l_orderkey,
       o_orderdate,
       o_shippriority
   ORDER BY
       revenue DESC,
       o_orderdate"""
   
   def registerPostgres(spark, tables, url):
       for name in tables:
           spark.sql(f"""
               CREATE TEMPORARY VIEW {name}
               USING org.apache.spark.sql.jdbc
               OPTIONS (
                 driver "org.postgresql.Driver",
                 url "{url}",
                 dbtable "public.{name}",
                 user '{POSTGRES_USER}',
                 password '{POSTGRES_PASSWORD}',
                 pushDownAggregate 'true'
               )
               """)
               
   
   def registerViews(spark):
       registerPostgres(spark, ["lineitem", "customer", "orders", "nation", 
"region", "supplier", "part", "partsupp"], POSTGRES_URL)
   
   
   def run_query(spark, query):
       with Timer() as timer:
           df = spark.sql(query)
           df.collect()
       print(f"get {df.count()} rows, {len(df.columns)} cols")
       print(f"plan: {df.explain()}")
       print(f"took {timer.elapsed:.2f} in total")
       # print(df)
       print()
       sys.stdout.flush()
   
           
   
   if __name__ == '__main__':
   
       spark = (
           SparkSession.builder.master("local[32]")
           .appName("test-spark")
           .config("spark.jars", SPARK_JARS)
           .config("spark.executor.memory", "110g")
           .config("spark.driver.memory", "110g")
           .config("spark.log.level", "INFO")
           .config("spark.ui.port", "4040")
           .getOrCreate()
       )
   
       print(spark.sparkContext.getConf().getAll())
       registerViews(spark)
   
       run_query(spark, TPCH_Q3)
       time.sleep(2)
       spark.stop()
   
   ```
   
   ### Some investigation
   
   The queries that are used to fetch data from postgres using both platforms, 
which are basically the same (filter and projection pushdown are enabled).
   
   I try to print the logs of spark execution as much as I can to see the 
difference between the two.  One significant overhead I found is that wayang 
produces much larger `ShuffleMapTask` for join than spark does (~46500000 bytes 
v.s. 8000 bytes), which causes ~2 seconds to serialize each task (64 tasks in 
total) one by one and result in a 1 minutes overhead. On the other hand, the 
serialization time on spark is negligible.
   
   I'm not very familiar with spark execution, so I'm not sure why it is the 
case. Can anyone give me a pointer? Is there anything I'm missing such as in 
configuration? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to