[
https://issues.apache.org/jira/browse/SPARK-40233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598550#comment-17598550
]
Sean R. Owen commented on SPARK-40233:
--------------------------------------
That's what happens, right?
Spark is of course meant to read data sets directly in parallel. You don't read
them single-node and then send them to Spark in general.
> Unable to load large pandas dataframe to pyspark
> ------------------------------------------------
>
> Key: SPARK-40233
> URL: https://issues.apache.org/jira/browse/SPARK-40233
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.3.0
> Reporter: Niranda Perera
> Priority: Major
>
> I've been trying to join two large pandas dataframes using pyspark using the
> following code. I'm trying to vary executor cores allocated for the
> application and measure scalability of pyspark (strong scaling).
> {code:java}
> r = 1000000000 # 1Bn rows
> it = 10
> w = 256
> unique = 0.9
> TOTAL_MEM = 240
> TOTAL_NODES = 14
> max_val = r * unique
> rng = default_rng()
> frame_data = rng.integers(0, max_val, size=(r, 2))
> frame_data1 = rng.integers(0, max_val, size=(r, 2))
> print(f"data generated", flush=True)
> df_l = pd.DataFrame(frame_data).add_prefix("col")
> df_r = pd.DataFrame(frame_data1).add_prefix("col")
> print(f"data loaded", flush=True)
> procs = int(math.ceil(w / TOTAL_NODES))
> mem = int(TOTAL_MEM*0.9)
> print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}",
> flush=True)
> spark = SparkSession\
> .builder\
> .appName(f'join {r} {w}')\
> .master('spark://node:7077')\
> .config('spark.executor.memory', f'{int(mem*0.6)}g')\
> .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\
> .config('spark.cores.max', w)\
> .config('spark.driver.memory', '100g')\
> .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\
> .getOrCreate()
> sdf0 = spark.createDataFrame(df_l).repartition(w).cache()
> sdf1 = spark.createDataFrame(df_r).repartition(w).cache()
> print(f"data loaded to spark", flush=True)
> try:
> for i in range(it):
> t1 = time.time()
> out = sdf0.join(sdf1, on='col0', how='inner')
> count = out.count()
> t2 = time.time()
> print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}",
> flush=True)
>
> del out
> del count
> gc.collect()
> finally:
> spark.stop() {code}
> {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with
> 48 cores and 240GB RAM each. I've spawned master and the driver code in
> node1, while other 14 nodes have spawned workers allocating maximum memory.
> In the spark context, I am reserving 90% of total memory to executor,
> splitting 60% to jvm and 40% to pyspark.
> {*}Issue{*}: When I run the above program, I can see that the executors are
> being assigned to the app. But it doesn't move forward, even after 60 mins.
> For smaller row count (10M), this was working without a problem. Driver output
> {code:java}
> world sz 256 procs per worker 19 mem 216 iter 8
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library
> for your platform... using builtin-java classes where applicable
> /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425:
> UserWarning: createDataFrame attempted Arrow optimization because
> 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed
> by the reason below:
> Negative initial size: -589934400
> Attempting non-optimization as
> 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
> warn(msg) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]