[ https://issues.apache.org/jira/browse/SPARK-47650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yinan zhan updated SPARK-47650: ------------------------------- Description: import json import os import time from pyspark.sql import SparkSession num_gpus = 8 spark = SparkSession.builder \ .appName("llm hard negs records") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "40g") \ .config("spark.local.dir", "/tmp/pyspark") \ .master(f"local[\{num_gpus}]") \ .getOrCreate() def process_partition(index, partition): device_id = index % num_gpus device = f"cuda:\{device_id}" print(device) time.sleep(10) results = [] s = 0 for row in partition: results.append((row['Query'], row['Hard Negative Document'], row['Positive Document'], "C")) s += 1 print(str(index) + "cool" + str(s)) return results def generate_fake_data(num_records, output_file_path): fake_data = [{ "Query": f"Query \{i}", "Hard Negative Document": f"Hard Negative Document \{i}", "Positive Document": f"Positive Document \{i}" } for i in range(num_records)] os.makedirs(os.path.dirname(output_file_path), exist_ok=True) with open(output_file_path, 'w') as f: for item in fake_data: f.write(json.dumps(item) + '\n') num_records = 2000 file_path = '/tmp/fake_input_data.jsonl' generate_fake_data(num_records, file_path) df = spark.read.json(file_path).repartition(num_gpus) results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive Document", "Result"]) output_path = "/tmp/bc_inputs6" results_df.write.json(output_path, mode="overwrite") was: With a parallelism level of 8, in PySpark local mode, when using DataFrame, there will be one task that executes first. The other seven tasks wait for this task to complete before they can execute in parallel. If the same functionality is implemented using RDD mode, this issue does not occur. Below is the reproduction code. {code:java} import json import os import time from pyspark.sql import SparkSession num_gpus = 8 spark = SparkSession.builder \ .appName("llm hard negs records") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "40g") \ .config("spark.local.dir", "/tmp/pyspark") \ .master(f"local[{num_gpus}]") \ .getOrCreate() def process_partition(index, partition): device_id = index % num_gpus device = f"cuda:{device_id}" print(device) time.sleep(10) results = [] for row in partition: results.append((row['Query'], row['Hard Negative Document'], row['Positive Document'], "C")) print("cool") return results def generate_fake_data(num_records, output_file_path): fake_data = [{ "Query": f"Query {i}", "Hard Negative Document": f"Hard Negative Document {i}", "Positive Document": f"Positive Document {i}" } for i in range(num_records)] os.makedirs(os.path.dirname(output_file_path), exist_ok=True) with open(output_file_path, 'w') as f: for item in fake_data: f.write(json.dumps(item) + '\n') num_records = 2000 file_path = '/tmp/fake_input_data.jsonl' generate_fake_data(num_records, file_path) df = spark.read.json(file_path).repartition(num_gpus) results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive Document", "Result"]) output_path = "/tmp/bc_inputs6" results_df.write.json(output_path) {code} > In local mode, Spark DataFrame cannot fully parallelize > ------------------------------------------------------- > > Key: SPARK-47650 > URL: https://issues.apache.org/jira/browse/SPARK-47650 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL > Affects Versions: 3.5.1 > Reporter: yinan zhan > Priority: Minor > > import json > import os > import time > from pyspark.sql import SparkSession > num_gpus = 8 > spark = SparkSession.builder \ > .appName("llm hard negs records") \ > .config("spark.executor.memory", "4g") \ > .config("spark.driver.memory", "40g") \ > .config("spark.local.dir", "/tmp/pyspark") \ > .master(f"local[\{num_gpus}]") \ > .getOrCreate() > def process_partition(index, partition): > device_id = index % num_gpus > device = f"cuda:\{device_id}" > print(device) > time.sleep(10) > results = [] > s = 0 > for row in partition: > results.append((row['Query'], row['Hard Negative Document'], > row['Positive Document'], "C")) > s += 1 > print(str(index) + "cool" + str(s)) > return results > def generate_fake_data(num_records, output_file_path): > fake_data = [{ > "Query": f"Query \{i}", > "Hard Negative Document": f"Hard Negative Document \{i}", > "Positive Document": f"Positive Document \{i}" > } for i in range(num_records)] > os.makedirs(os.path.dirname(output_file_path), exist_ok=True) > with open(output_file_path, 'w') as f: > for item in fake_data: > f.write(json.dumps(item) + '\n') > num_records = 2000 > file_path = '/tmp/fake_input_data.jsonl' > generate_fake_data(num_records, file_path) > df = spark.read.json(file_path).repartition(num_gpus) > results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) > results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive > Document", "Result"]) > output_path = "/tmp/bc_inputs6" > results_df.write.json(output_path, mode="overwrite") -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org