[ https://issues.apache.org/jira/browse/SPARK-47650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yinan zhan updated SPARK-47650: ------------------------------- Description: The data in Partition 1 was executed twice. Here is the reproduction code; the issue occurs every time. {code:java} import json import os from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("llm hard negs records") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "40g") \ .config("spark.local.dir", "/tmp/pyspark") \ .master(f"local[8]") \ .getOrCreate() def process_partition(index, partition): results = [] s = 0 for row in partition: results.append((row['Query'], row['Hard Negative Document'], row['Positive Document'], "C")) s += 1 print(str(index) + "cool" + str(s)) return results def generate_fake_data(num_records, output_file_path): fake_data = [{ "Query": f"Query {i}", "Hard Negative Document": f"Hard Negative Document {i}", "Positive Document": f"Positive Document {i}" } for i in range(num_records)] os.makedirs(os.path.dirname(output_file_path), exist_ok=True) with open(output_file_path, 'w') as f: for item in fake_data: f.write(json.dumps(item) + '\n') num_records = 2000 file_path = '/tmp/fake_input_data.jsonl' generate_fake_data(num_records, file_path) df = spark.read.json(file_path).repartition(8) results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive Document", "Result"]) output_path = "/tmp/bc_inputs6" results_df.write.json(output_path, mode="overwrite") {code} was: The data in Partition 1 was executed twice. Here is the reproduction code; the issue occurs every time. {color:#cc7832}import {color}json {color:#cc7832}import {color}os {color:#cc7832}from {color}pyspark.sql {color:#cc7832}import {color}SparkSession spark = SparkSession.builder \ .appName({color:#6a8759}"llm hard negs records"{color}) \ .config({color:#6a8759}"spark.executor.memory"{color}{color:#cc7832}, {color}{color:#6a8759}"4g"{color}) \ .config({color:#6a8759}"spark.driver.memory"{color}{color:#cc7832}, {color}{color:#6a8759}"40g"{color}) \ .config({color:#6a8759}"spark.local.dir"{color}{color:#cc7832}, {color}{color:#6a8759}"/tmp/pyspark"{color}) \ .master({color:#6a8759}f"local[8]"{color}) \ .getOrCreate() {color:#cc7832}def {color}{color:#ffc66d}process_partition{color}(index{color:#cc7832}, {color}partition): results = [] s = {color:#6897bb}0 {color}{color:#6897bb} {color}{color:#6897bb} {color}{color:#cc7832}for {color}row {color:#cc7832}in {color}partition: results.append((row[{color:#6a8759}'Query'{color}]{color:#cc7832}, {color}row[{color:#6a8759}'Hard Negative Document'{color}]{color:#cc7832}, {color}row[{color:#6a8759}'Positive Document'{color}]{color:#cc7832}, {color}{color:#6a8759}"C"{color})) s += {color:#6897bb}1 {color}{color:#6897bb} {color}{color:#6897bb} {color}{color:#8888c6}print{color}({color:#8888c6}str{color}(index) + {color:#6a8759}"cool" {color}+ {color:#8888c6}str{color}(s)) {color:#cc7832}return {color}results {color:#cc7832}def {color}{color:#ffc66d}generate_fake_data{color}(num_records{color:#cc7832}, {color}output_file_path): fake_data = [{ {color:#6a8759}"Query"{color}: {color:#6a8759}f"Query {color}{color:#cc7832}{{color}i{color:#cc7832}}{color}{color:#6a8759}"{color}{color:#cc7832}, {color}{color:#cc7832} {color}{color:#6a8759}"Hard Negative Document"{color}: {color:#6a8759}f"Hard Negative Document {color}{color:#cc7832}{{color}i{color:#cc7832}}{color}{color:#6a8759}"{color}{color:#cc7832}, {color}{color:#cc7832} {color}{color:#6a8759}"Positive Document"{color}: {color:#6a8759}f"Positive Document {color}{color:#cc7832}{{color}i{color:#cc7832}}{color}{color:#6a8759}" {color}{color:#6a8759} {color}} {color:#cc7832}for {color}i {color:#cc7832}in {color}{color:#8888c6}range{color}(num_records)] os.makedirs(os.path.dirname(output_file_path){color:#cc7832}, {color}{color:#aa4926}exist_ok{color}={color:#cc7832}True{color}) {color:#cc7832}with {color}{color:#8888c6}open{color}(output_file_path{color:#cc7832}, {color}{color:#6a8759}'w'{color}) {color:#cc7832}as {color}f: {color:#cc7832}for {color}item {color:#cc7832}in {color}fake_data: f.write(json.dumps(item) + {color:#6a8759}'{color}{color:#cc7832}\n{color}{color:#6a8759}'{color}) num_records = {color:#6897bb}2000 {color}file_path = {color:#6a8759}'/tmp/fake_input_data.jsonl' {color}{color:#6a8759} {color}generate_fake_data(num_records{color:#cc7832}, {color}file_path) df = spark.read.json(file_path).repartition({color:#6897bb}8{color}) results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) results_df = results_rdd.toDF([{color:#6a8759}"Query"{color}{color:#cc7832}, {color}{color:#6a8759}"Hard Negative Document"{color}{color:#cc7832}, {color}{color:#6a8759}"Positive Document"{color}{color:#cc7832}, {color}{color:#6a8759}"Result"{color}]) output_path = {color:#6a8759}"/tmp/bc_inputs6" {color}results_df.write.json(output_path{color:#cc7832}, {color}{color:#aa4926}mode{color}={color:#6a8759}"overwrite"{color}) > In local mode, Spark DataFrame cannot fully parallelize > ------------------------------------------------------- > > Key: SPARK-47650 > URL: https://issues.apache.org/jira/browse/SPARK-47650 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 3.5.1 > Reporter: yinan zhan > Priority: Critical > > The data in Partition 1 was executed twice. > Here is the reproduction code; the issue occurs every time. > > {code:java} > import json > import os > from pyspark.sql import SparkSession > spark = SparkSession.builder \ > .appName("llm hard negs records") \ > .config("spark.executor.memory", "4g") \ > .config("spark.driver.memory", "40g") \ > .config("spark.local.dir", "/tmp/pyspark") \ > .master(f"local[8]") \ > .getOrCreate() > def process_partition(index, partition): > results = [] > s = 0 > for row in partition: > results.append((row['Query'], row['Hard Negative Document'], > row['Positive Document'], "C")) > s += 1 > print(str(index) + "cool" + str(s)) > return results > def generate_fake_data(num_records, output_file_path): > fake_data = [{ > "Query": f"Query {i}", > "Hard Negative Document": f"Hard Negative Document {i}", > "Positive Document": f"Positive Document {i}" > } for i in range(num_records)] > os.makedirs(os.path.dirname(output_file_path), exist_ok=True) > with open(output_file_path, 'w') as f: > for item in fake_data: > f.write(json.dumps(item) + '\n') > num_records = 2000 > file_path = '/tmp/fake_input_data.jsonl' > generate_fake_data(num_records, file_path) > df = spark.read.json(file_path).repartition(8) > results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) > results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive > Document", "Result"]) > output_path = "/tmp/bc_inputs6" > results_df.write.json(output_path, mode="overwrite") {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org