[ 
https://issues.apache.org/jira/browse/SPARK-47650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yinan zhan updated SPARK-47650:
-------------------------------
    Description: 
import json
import os
import time

from pyspark.sql import SparkSession

num_gpus = 8

spark = SparkSession.builder \
    .appName("llm hard negs records") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "40g") \
    .config("spark.local.dir", "/tmp/pyspark") \
    .master(f"local[\{num_gpus}]") \
    .getOrCreate()


def process_partition(index, partition):
    device_id = index % num_gpus
    device = f"cuda:\{device_id}"
    print(device)
    time.sleep(10)
    results = []
    s = 0

    for row in partition:
        results.append((row['Query'], row['Hard Negative Document'], 
row['Positive Document'], "C"))
        s += 1

    print(str(index) + "cool" + str(s))
    return results

def generate_fake_data(num_records, output_file_path):
    fake_data = [{
        "Query": f"Query \{i}",
        "Hard Negative Document": f"Hard Negative Document \{i}",
        "Positive Document": f"Positive Document \{i}"
    } for i in range(num_records)]

    os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

    with open(output_file_path, 'w') as f:
        for item in fake_data:
            f.write(json.dumps(item) + '\n')

num_records = 2000
file_path = '/tmp/fake_input_data.jsonl'

generate_fake_data(num_records, file_path)

df = spark.read.json(file_path).repartition(num_gpus)

results_rdd = df.rdd.mapPartitionsWithIndex(process_partition)


results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive 
Document", "Result"])

output_path = "/tmp/bc_inputs6"
results_df.write.json(output_path, mode="overwrite") 

  was:
With a parallelism level of 8, in PySpark local mode, when using DataFrame, 
there will be one task that executes first. The other seven tasks wait for this 
task to complete before they can execute in parallel.

If the same functionality is implemented using RDD mode, this issue does not 
occur. Below is the reproduction code.

 

 
{code:java}
import json
import os
import time

from pyspark.sql import SparkSession

num_gpus = 8

spark = SparkSession.builder \
    .appName("llm hard negs records") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "40g") \
    .config("spark.local.dir", "/tmp/pyspark") \
    .master(f"local[{num_gpus}]") \
    .getOrCreate()


def process_partition(index, partition):
    device_id = index % num_gpus
    device = f"cuda:{device_id}"
    print(device)
    time.sleep(10)
    results = []

    for row in partition:
        results.append((row['Query'], row['Hard Negative Document'], 
row['Positive Document'], "C"))

    print("cool")
    return results

def generate_fake_data(num_records, output_file_path):
    fake_data = [{
        "Query": f"Query {i}",
        "Hard Negative Document": f"Hard Negative Document {i}",
        "Positive Document": f"Positive Document {i}"
    } for i in range(num_records)]

    os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

    with open(output_file_path, 'w') as f:
        for item in fake_data:
            f.write(json.dumps(item) + '\n')

num_records = 2000
file_path = '/tmp/fake_input_data.jsonl'

generate_fake_data(num_records, file_path)

df = spark.read.json(file_path).repartition(num_gpus)

results_rdd = df.rdd.mapPartitionsWithIndex(process_partition)


results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive 
Document", "Result"])

output_path = "/tmp/bc_inputs6"
results_df.write.json(output_path) {code}


> In local mode, Spark DataFrame cannot fully parallelize
> -------------------------------------------------------
>
>                 Key: SPARK-47650
>                 URL: https://issues.apache.org/jira/browse/SPARK-47650
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, SQL
>    Affects Versions: 3.5.1
>            Reporter: yinan zhan
>            Priority: Minor
>
> import json
> import os
> import time
> from pyspark.sql import SparkSession
> num_gpus = 8
> spark = SparkSession.builder \
>     .appName("llm hard negs records") \
>     .config("spark.executor.memory", "4g") \
>     .config("spark.driver.memory", "40g") \
>     .config("spark.local.dir", "/tmp/pyspark") \
>     .master(f"local[\{num_gpus}]") \
>     .getOrCreate()
> def process_partition(index, partition):
>     device_id = index % num_gpus
>     device = f"cuda:\{device_id}"
>     print(device)
>     time.sleep(10)
>     results = []
>     s = 0
>     for row in partition:
>         results.append((row['Query'], row['Hard Negative Document'], 
> row['Positive Document'], "C"))
>         s += 1
>     print(str(index) + "cool" + str(s))
>     return results
> def generate_fake_data(num_records, output_file_path):
>     fake_data = [{
>         "Query": f"Query \{i}",
>         "Hard Negative Document": f"Hard Negative Document \{i}",
>         "Positive Document": f"Positive Document \{i}"
>     } for i in range(num_records)]
>     os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
>     with open(output_file_path, 'w') as f:
>         for item in fake_data:
>             f.write(json.dumps(item) + '\n')
> num_records = 2000
> file_path = '/tmp/fake_input_data.jsonl'
> generate_fake_data(num_records, file_path)
> df = spark.read.json(file_path).repartition(num_gpus)
> results_rdd = df.rdd.mapPartitionsWithIndex(process_partition)
> results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive 
> Document", "Result"])
> output_path = "/tmp/bc_inputs6"
> results_df.write.json(output_path, mode="overwrite") 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to