[
https://issues.apache.org/jira/browse/SPARK-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yan Chen updated SPARK-15716:
-----------------------------
Description:
Code:
{code:java}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
public class App {
public static void main(String[] args) {
final String input = args[0];
final String check = args[1];
final long interval = Long.parseLong(args[2]);
final SparkConf conf = new SparkConf();
conf.set("spark.streaming.minRememberDuration", "180s");
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
conf.set("spark.streaming.unpersist", "true");
conf.set("spark.streaming.ui.retainedBatches", "10");
conf.set("spark.ui.retainedJobs", "10");
conf.set("spark.ui.retainedStages", "10");
conf.set("spark.worker.ui.retainedExecutors", "10");
conf.set("spark.worker.ui.retainedDrivers", "10");
conf.set("spark.sql.ui.retainedExecutions", "10");
JavaStreamingContextFactory jscf = () -> {
SparkContext sc = new SparkContext(conf);
sc.setCheckpointDir(check);
StreamingContext ssc = new StreamingContext(sc,
Durations.milliseconds(interval));
JavaStreamingContext jssc = new JavaStreamingContext(ssc);
jssc.checkpoint(check);
// setup pipeline here
JavaPairDStream<LongWritable, Text> inputStream =
jssc.fileStream(
input,
LongWritable.class,
Text.class,
TextInputFormat.class,
(filepath) -> Boolean.TRUE,
false
);
JavaPairDStream<LongWritable, Text> usbk = inputStream
.updateStateByKey((current, state) -> state);
usbk.checkpoint(Durations.seconds(10));
usbk.foreachRDD(rdd -> {
rdd.count();
System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
return null;
});
return jssc;
};
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);
jssc.start();
jssc.awaitTermination();
}
}
{code}
Command used to run the code
{code:none}
spark-submit --keytab [keytab] --principal [principal] --class [package].App
--master yarn --driver-memory 1g --executor-memory 1G --conf
"spark.driver.maxResultSize=0" --conf "spark.logConf=true" --conf
"spark.executor.instances=2" --conf
"spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions
-XX:+G1SummarizeConcMark" --conf
"spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark" [jar-file-path]
file:///[dir-on-nas-drive] [dir-on-hdfs] 200
{code}
It's a very simple piece of code, when I ran it, the memory usage of driver
keeps going up. There is no file input in our runs. Batch interval is set to
200 milliseconds; processing time for each batch is below 150 milliseconds,
while most of which are below 70 milliseconds.
!http://i.imgur.com/uSzUui6.png!
The right most four red triangles are full GC's which are triggered manually by
using "jcmd pid GC.run" command.
was:
Code:
{code:java}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
public class App {
public static void main(String[] args) {
final String input = args[0];
final String check = args[1];
final long interval = Long.parseLong(args[2]);
final SparkConf conf = new SparkConf();
conf.set("spark.streaming.minRememberDuration", "180s");
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
conf.set("spark.streaming.unpersist", "true");
conf.set("spark.streaming.ui.retainedBatches", "10");
conf.set("spark.ui.retainedJobs", "10");
conf.set("spark.ui.retainedStages", "10");
conf.set("spark.worker.ui.retainedExecutors", "10");
conf.set("spark.worker.ui.retainedDrivers", "10");
conf.set("spark.sql.ui.retainedExecutions", "10");
JavaStreamingContextFactory jscf = () -> {
SparkContext sc = new SparkContext(conf);
sc.setCheckpointDir(check);
StreamingContext ssc = new StreamingContext(sc,
Durations.milliseconds(interval));
JavaStreamingContext jssc = new JavaStreamingContext(ssc);
jssc.checkpoint(check);
// setup pipeline here
JavaPairDStream<LongWritable, Text> inputStream =
jssc.fileStream(
input,
LongWritable.class,
Text.class,
TextInputFormat.class,
(filepath) -> Boolean.TRUE,
false
);
JavaPairDStream<LongWritable, Text> usbk = inputStream
.updateStateByKey((current, state) -> state);
usbk.checkpoint(Durations.seconds(10));
usbk.foreachRDD(rdd -> {
rdd.count();
System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
return null;
});
return jssc;
};
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);
jssc.start();
jssc.awaitTermination();
}
}
{code}
Command used to run the code
{code:none}
spark-submit --keytab [keytab] --principal [principal] --class [package].App
--master yarn --driver-memory 1g --executor-memory 1G --conf
"spark.driver.maxResultSize=0" --conf "spark.logConf=true" --conf
"spark.executor.instances=2" --conf
"spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions
-XX:+G1SummarizeConcMark" --conf
"spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark" [jar-file-path]
file:///[dir-on-nas-drive] [dir-on-hdfs] 200
{code}
It's a very simple piece of code, when I ran it, the memory usage of driver
keeps going up. Batch interval is set to 200 milliseconds; processing time for
each batch is below 150 milliseconds, while most of which are below 70
milliseconds.
!http://i.imgur.com/uSzUui6.png!
The right most four red triangles are full GC's which are triggered manually by
using "jcmd pid GC.run" command.
> Memory usage keep growing up in Spark Streaming
> -----------------------------------------------
>
> Key: SPARK-15716
> URL: https://issues.apache.org/jira/browse/SPARK-15716
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.4.1
> Environment: Oracle Java 1.8.0_51, SUSE Linux
> Reporter: Yan Chen
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> Code:
> {code:java}
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
> import org.apache.spark.SparkConf;
> import org.apache.spark.SparkContext;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.StreamingContext;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class App {
> public static void main(String[] args) {
> final String input = args[0];
> final String check = args[1];
> final long interval = Long.parseLong(args[2]);
> final SparkConf conf = new SparkConf();
> conf.set("spark.streaming.minRememberDuration", "180s");
> conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
> conf.set("spark.streaming.unpersist", "true");
> conf.set("spark.streaming.ui.retainedBatches", "10");
> conf.set("spark.ui.retainedJobs", "10");
> conf.set("spark.ui.retainedStages", "10");
> conf.set("spark.worker.ui.retainedExecutors", "10");
> conf.set("spark.worker.ui.retainedDrivers", "10");
> conf.set("spark.sql.ui.retainedExecutions", "10");
> JavaStreamingContextFactory jscf = () -> {
> SparkContext sc = new SparkContext(conf);
> sc.setCheckpointDir(check);
> StreamingContext ssc = new StreamingContext(sc,
> Durations.milliseconds(interval));
> JavaStreamingContext jssc = new JavaStreamingContext(ssc);
> jssc.checkpoint(check);
> // setup pipeline here
> JavaPairDStream<LongWritable, Text> inputStream =
> jssc.fileStream(
> input,
> LongWritable.class,
> Text.class,
> TextInputFormat.class,
> (filepath) -> Boolean.TRUE,
> false
> );
> JavaPairDStream<LongWritable, Text> usbk = inputStream
> .updateStateByKey((current, state) -> state);
> usbk.checkpoint(Durations.seconds(10));
> usbk.foreachRDD(rdd -> {
> rdd.count();
> System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
> return null;
> });
> return jssc;
> };
> JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);
> jssc.start();
> jssc.awaitTermination();
> }
> }
> {code}
> Command used to run the code
> {code:none}
> spark-submit --keytab [keytab] --principal [principal] --class [package].App
> --master yarn --driver-memory 1g --executor-memory 1G --conf
> "spark.driver.maxResultSize=0" --conf "spark.logConf=true" --conf
> "spark.executor.instances=2" --conf
> "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC
> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
> -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions
> -XX:+G1SummarizeConcMark" --conf
> "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log
> -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
> -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark" [jar-file-path]
> file:///[dir-on-nas-drive] [dir-on-hdfs] 200
> {code}
> It's a very simple piece of code, when I ran it, the memory usage of driver
> keeps going up. There is no file input in our runs. Batch interval is set to
> 200 milliseconds; processing time for each batch is below 150 milliseconds,
> while most of which are below 70 milliseconds.
> !http://i.imgur.com/uSzUui6.png!
> The right most four red triangles are full GC's which are triggered manually
> by using "jcmd pid GC.run" command.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]