[ 
https://issues.apache.org/jira/browse/SPARK-47650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yinan zhan updated SPARK-47650:
-------------------------------
    Description: 
The data in Partition 1 was executed twice.

Here is the reproduction code; the issue occurs every time.

 
{code:java}
import json
import os

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("llm hard negs records") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "40g") \
    .config("spark.local.dir", "/tmp/pyspark") \
    .master(f"local[8]") \
    .getOrCreate()


def process_partition(index, partition):
    results = []
    s = 0

    for row in partition:
        results.append((row['Query'], row['Hard Negative Document'], 
row['Positive Document'], "C"))
        s += 1

    print(str(index) + "cool" + str(s))
    return results

def generate_fake_data(num_records, output_file_path):
    fake_data = [{
        "Query": f"Query {i}",
        "Hard Negative Document": f"Hard Negative Document {i}",
        "Positive Document": f"Positive Document {i}"
    } for i in range(num_records)]

    os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

    with open(output_file_path, 'w') as f:
        for item in fake_data:
            f.write(json.dumps(item) + '\n')

num_records = 2000
file_path = '/tmp/fake_input_data.jsonl'

generate_fake_data(num_records, file_path)

df = spark.read.json(file_path).repartition(8)

results_rdd = df.rdd.mapPartitionsWithIndex(process_partition)
results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive 
Document", "Result"])

output_path = "/tmp/bc_inputs6"
results_df.write.json(output_path, mode="overwrite") {code}

  was:
The data in Partition 1 was executed twice.

Here is the reproduction code; the issue occurs every time.

 
{color:#cc7832}import {color}json
{color:#cc7832}import {color}os

{color:#cc7832}from {color}pyspark.sql {color:#cc7832}import {color}SparkSession

spark = SparkSession.builder \
.appName({color:#6a8759}"llm hard negs records"{color}) \
.config({color:#6a8759}"spark.executor.memory"{color}{color:#cc7832}, 
{color}{color:#6a8759}"4g"{color}) \
.config({color:#6a8759}"spark.driver.memory"{color}{color:#cc7832}, 
{color}{color:#6a8759}"40g"{color}) \
.config({color:#6a8759}"spark.local.dir"{color}{color:#cc7832}, 
{color}{color:#6a8759}"/tmp/pyspark"{color}) \
.master({color:#6a8759}f"local[8]"{color}) \
.getOrCreate()


{color:#cc7832}def 
{color}{color:#ffc66d}process_partition{color}(index{color:#cc7832}, 
{color}partition):
results = []
s = {color:#6897bb}0
{color}{color:#6897bb}
{color}{color:#6897bb} {color}{color:#cc7832}for {color}row {color:#cc7832}in 
{color}partition:
results.append((row[{color:#6a8759}'Query'{color}]{color:#cc7832}, 
{color}row[{color:#6a8759}'Hard Negative Document'{color}]{color:#cc7832}, 
{color}row[{color:#6a8759}'Positive Document'{color}]{color:#cc7832}, 
{color}{color:#6a8759}"C"{color}))
s += {color:#6897bb}1
{color}{color:#6897bb}
{color}{color:#6897bb} 
{color}{color:#8888c6}print{color}({color:#8888c6}str{color}(index) + 
{color:#6a8759}"cool" {color}+ {color:#8888c6}str{color}(s))
{color:#cc7832}return {color}results

{color:#cc7832}def 
{color}{color:#ffc66d}generate_fake_data{color}(num_records{color:#cc7832}, 
{color}output_file_path):
fake_data = [{
{color:#6a8759}"Query"{color}: {color:#6a8759}f"Query 
{color}{color:#cc7832}{{color}i{color:#cc7832}}{color}{color:#6a8759}"{color}{color:#cc7832},
{color}{color:#cc7832} {color}{color:#6a8759}"Hard Negative Document"{color}: 
{color:#6a8759}f"Hard Negative Document 
{color}{color:#cc7832}{{color}i{color:#cc7832}}{color}{color:#6a8759}"{color}{color:#cc7832},
{color}{color:#cc7832} {color}{color:#6a8759}"Positive Document"{color}: 
{color:#6a8759}f"Positive Document 
{color}{color:#cc7832}{{color}i{color:#cc7832}}{color}{color:#6a8759}"
{color}{color:#6a8759} {color}} {color:#cc7832}for {color}i {color:#cc7832}in 
{color}{color:#8888c6}range{color}(num_records)]

os.makedirs(os.path.dirname(output_file_path){color:#cc7832}, 
{color}{color:#aa4926}exist_ok{color}={color:#cc7832}True{color})

{color:#cc7832}with 
{color}{color:#8888c6}open{color}(output_file_path{color:#cc7832}, 
{color}{color:#6a8759}'w'{color}) {color:#cc7832}as {color}f:
{color:#cc7832}for {color}item {color:#cc7832}in {color}fake_data:
f.write(json.dumps(item) + 
{color:#6a8759}'{color}{color:#cc7832}\n{color}{color:#6a8759}'{color})

num_records = {color:#6897bb}2000
{color}file_path = {color:#6a8759}'/tmp/fake_input_data.jsonl'
{color}{color:#6a8759}
{color}generate_fake_data(num_records{color:#cc7832}, {color}file_path)

df = spark.read.json(file_path).repartition({color:#6897bb}8{color})

results_rdd = df.rdd.mapPartitionsWithIndex(process_partition)
results_df = results_rdd.toDF([{color:#6a8759}"Query"{color}{color:#cc7832}, 
{color}{color:#6a8759}"Hard Negative Document"{color}{color:#cc7832}, 
{color}{color:#6a8759}"Positive Document"{color}{color:#cc7832}, 
{color}{color:#6a8759}"Result"{color}])

output_path = {color:#6a8759}"/tmp/bc_inputs6"
{color}results_df.write.json(output_path{color:#cc7832}, 
{color}{color:#aa4926}mode{color}={color:#6a8759}"overwrite"{color})


> In local mode, Spark DataFrame cannot fully parallelize
> -------------------------------------------------------
>
>                 Key: SPARK-47650
>                 URL: https://issues.apache.org/jira/browse/SPARK-47650
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.5.1
>            Reporter: yinan zhan
>            Priority: Critical
>
> The data in Partition 1 was executed twice.
> Here is the reproduction code; the issue occurs every time.
>  
> {code:java}
> import json
> import os
> from pyspark.sql import SparkSession
> spark = SparkSession.builder \
>     .appName("llm hard negs records") \
>     .config("spark.executor.memory", "4g") \
>     .config("spark.driver.memory", "40g") \
>     .config("spark.local.dir", "/tmp/pyspark") \
>     .master(f"local[8]") \
>     .getOrCreate()
> def process_partition(index, partition):
>     results = []
>     s = 0
>     for row in partition:
>         results.append((row['Query'], row['Hard Negative Document'], 
> row['Positive Document'], "C"))
>         s += 1
>     print(str(index) + "cool" + str(s))
>     return results
> def generate_fake_data(num_records, output_file_path):
>     fake_data = [{
>         "Query": f"Query {i}",
>         "Hard Negative Document": f"Hard Negative Document {i}",
>         "Positive Document": f"Positive Document {i}"
>     } for i in range(num_records)]
>     os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
>     with open(output_file_path, 'w') as f:
>         for item in fake_data:
>             f.write(json.dumps(item) + '\n')
> num_records = 2000
> file_path = '/tmp/fake_input_data.jsonl'
> generate_fake_data(num_records, file_path)
> df = spark.read.json(file_path).repartition(8)
> results_rdd = df.rdd.mapPartitionsWithIndex(process_partition)
> results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive 
> Document", "Result"])
> output_path = "/tmp/bc_inputs6"
> results_df.write.json(output_path, mode="overwrite") {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to