YuNing Liu created SPARK-39290:
----------------------------------
Summary: Question of job division in "df.groupBy().applyInPandas"
Key: SPARK-39290
URL: https://issues.apache.org/jira/browse/SPARK-39290
Project: Spark
Issue Type: Question
Components: PySpark
Affects Versions: 3.2.1
Environment: python 3.8
pyspark 3.2.1
pyarrow 7.0.0
hadoop 3.3.2
Reporter: YuNing Liu
My program runs in spark on Yarn environment and has four nodes in total. My
num executors is set to 4, and so is the executor cores. When I use
"df.groupBy().applyInPandas" to process my dataframe, the program is always
divided into two jobs. The first job contains only one task, and the second job
contains three tasks. And the two jobs have the same DAG diagram and perform
exactly the same operations, but the data is different. As a result, the
execution time of my program is about doubled. It should be that only one job
contains four tasks. This problem has bothered me for a long time and I can't
find the reason. This code is a simple example I use to test. My dataframe
stores the information of the image data saved by HDFS, including three
columns: "Id", "path" and "category". "Id" indicates the node on which the
image is located, "path" indicates the specific path, "category" indicates the
image category.
{code:java}
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyhdfs import HdfsClient
spark = SparkSession.builder.appName("test"). \
config("spark.sql.shuffle.partitions", "40"). \
config("spark.default.parallelism", "40"). \
config("spark.sql.execution.arrow.pyspark.enabled", "true"). \
getOrCreate()
def process(key, paths):
a = len(paths.path)
for i in range(100000000):
a+=1
a = str(a)
return pd.DataFrame([key+(a,)])
if __name__ == '__main__':
sc = spark.sparkContext
client = HdfsClient(hosts="master:9870", user_name="hadoop")
dataset_dir = "/Datasets"
files_pd = pd.DataFrame()
for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)):
child_path = os.path.join(dataset_dir, per_dataset_dir)
files = pd.DataFrame([
[slave, os.path.join(str(child_path), str(child_dir_name),
str(filename)), index]
for index, child_dir_name in enumerate(client.listdir(child_path))
for filename in client.listdir(os.path.join(child_path,
child_dir_name))])
files_pd = pd.concat([files_pd, files])
files_pd = files_pd.sample(frac=1).reset_index(drop=True)
spark_files = spark.createDataFrame(files_pd, ("id", "path", "category"))
result = spark_files.groupby("id").applyInPandas(process, schema="id long,
path string")
result.show()
{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]