[
https://issues.apache.org/jira/browse/SPARK-39290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
YuNing Liu updated SPARK-39290:
-------------------------------
Attachment: WebUI.png
> Question of job division in "df.groupBy().applyInPandas"
> --------------------------------------------------------
>
> Key: SPARK-39290
> URL: https://issues.apache.org/jira/browse/SPARK-39290
> Project: Spark
> Issue Type: Question
> Components: PySpark
> Affects Versions: 3.2.1
> Environment: python 3.8
> pyspark 3.2.1
> pyarrow 7.0.0
> hadoop 3.3.2
> Reporter: YuNing Liu
> Priority: Major
> Attachments: WebUI.png
>
>
> My program runs in spark on Yarn environment and has four nodes in total. My
> num executors is set to 4, and so is the executor cores. When I use
> "df.groupBy().applyInPandas" to process my dataframe, the program is always
> divided into two jobs. The first job contains only one task, and the second
> job contains three tasks. And the two jobs have the same DAG diagram and
> perform exactly the same operations, but the data is different. As a result,
> the execution time of my program is about doubled. It should be that only one
> job contains four tasks. This problem has bothered me for a long time and I
> can't find the reason. This code is a simple example I use to test. My
> dataframe stores the information of the image data saved by HDFS, including
> three columns: "Id", "path" and "category". "Id" indicates the node on which
> the image is located, "path" indicates the specific path, "category"
> indicates the image category.
> {code:java}
> import os
> import pandas as pd
> from pyspark.sql import SparkSession
> from pyhdfs import HdfsClient
> spark = SparkSession.builder.appName("test"). \
> config("spark.sql.shuffle.partitions", "40"). \
> config("spark.default.parallelism", "40"). \
> config("spark.sql.execution.arrow.pyspark.enabled", "true"). \
> getOrCreate()
> def process(key, paths):
> a = len(paths.path)
> for i in range(100000000):
> a+=1
> a = str(a)
> return pd.DataFrame([key+(a,)])
> if __name__ == '__main__':
> sc = spark.sparkContext
> client = HdfsClient(hosts="master:9870", user_name="hadoop")
> dataset_dir = "/Datasets"
> files_pd = pd.DataFrame()
> for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)):
> child_path = os.path.join(dataset_dir, per_dataset_dir)
> files = pd.DataFrame([
> [slave, os.path.join(str(child_path), str(child_dir_name),
> str(filename)), index]
> for index, child_dir_name in enumerate(client.listdir(child_path))
> for filename in client.listdir(os.path.join(child_path,
> child_dir_name))])
> files_pd = pd.concat([files_pd, files])
> files_pd = files_pd.sample(frac=1).reset_index(drop=True)
> spark_files = spark.createDataFrame(files_pd, ("id", "path", "category"))
> result = spark_files.groupby("id").applyInPandas(process, schema="id
> long, path string")
> result.show()
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]