[ 
https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16562359#comment-16562359
 ] 

Markus Breuer commented on SPARK-24961:
---------------------------------------

I think it is an issue and I explained why. But I also posted to stackoverflow 
(some days ago) and adressed mailinglist, but got no feedback on this issue. 
Probably there is no easy answer to this issue, but probably my example helps 
to reproduce it.

> sort operation causes out of memory 
> ------------------------------------
>
>                 Key: SPARK-24961
>                 URL: https://issues.apache.org/jira/browse/SPARK-24961
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.3.1
>         Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>            Reporter: Markus Breuer
>            Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of 
> memory exception. I made the effect reproducable by an sample, the sample 
> creates large object of about 2mb size. When saving result the oom occurs. I 
> tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, 
> MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY 
> seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required 
> and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
>     public static void main(String... args) {
>         // create spark session
>         SparkSession spark = SparkSession.builder()
>                 .appName("example1")
>                 .master("local[4]")
>                 .config("spark.driver.maxResultSize","1g")
>                 .config("spark.driver.memory","512m")
>                 .config("spark.executor.memory","512m")
>                 .config("spark.local.dir","d:/temp/spark-tmp")
>                 .getOrCreate();
>         JavaSparkContext sc = 
> JavaSparkContext.fromSparkContext(spark.sparkContext());
>         // base to generate huge data
>         List<Integer> list = new ArrayList<>();
>         for (int val = 1; val < 10000; val++) {
>             int valueOf = Integer.valueOf(val);
>             list.add(valueOf);
>         }
>         // create simple rdd of int
>         JavaRDD<Integer> rdd = sc.parallelize(list,200);
>         // use map to create large object per row
>         JavaRDD<Row> rowRDD =
>                 rdd
>                         .map(value -> 
> RowFactory.create(String.valueOf(value), 
> createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
>                         // no persist => out of memory exception on write()
>                         // persist MEMORY_AND_DISK => out of memory exception 
> on write()
>                         // persist MEMORY_AND_DISK_SER => out of memory 
> exception on write()
>                         // persist(StorageLevel.DISK_ONLY())
>                 ;
>         StructType type = new StructType();
>         type = type
>                 .add("c1", DataTypes.StringType)
>                 .add( "c2", DataTypes.StringType );
>         Dataset<Row> df = spark.createDataFrame(rowRDD, type);
>         // works
>         df.show();
>         df = df
>                 .sort(col("c1").asc() )
>             ;
>         df.explain();
>         // takes a lot of time but works
>         df.show();
>         // OutOfMemoryError: java heap space
>         df
>             .write()
>             .mode("overwrite")
>             .csv("d:/temp/my.csv");
>         // OutOfMemoryError: java heap space
>         df
>                 .toJavaRDD()
>                 .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new 
> Text( row.getString(1))))
>                 .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, 
> SequenceFileOutputFormat.class );
>     }
>     private static String createLongText( String text, int minLength ) {
>         StringBuffer sb = new StringBuffer();
>         while( sb.length() < minLength ) {
>             sb.append(text);
>         }
>         return sb.toString();
>     }
> }
> {code}
> When using StorageLevel.MEMORY_AND_DISK(_SER) an oom crashes application at 
> partition 70 at heap usage of 3g from 4g available.
> It seems sort does something like collect, an heap dump shows very large 
> array of array - possibly the partition contents. Also 
> spark.driver.maxResultSize is involved, so sort exceeds the default values. 
> Setting it to unlimited causes oom.
> Why do I think this is a bug?
>  # Operation sort() should not involve maxResultSize
>  # MEMORY_AND_DISK should work at all and at least disk should be used. But I 
> see oom when reaching 3g of 4g total heap size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to