Re: [SS] Any way to optimize memory consumption of SS?
There is expected to be about 5 million UUIDs in a day. I need to use this field to drop duplicate records and count number. If I simply count numbers without using dropDuplicates it only occupies less than 1g memory. I believe most of the memory is occupied by the state store for keeping the state of dropDuplicates. But I cannot find a way to alleviate the problem. Michael Armbrust于2017年9月15日周五 上午3:35写道: > How many UUIDs do you expect to have in a day? That is likely where all > the memory is being used. Does it work without that? > > On Tue, Sep 12, 2017 at 8:42 PM, 张万新 wrote: > >> *Yes, my code is shown below(I also post my code in another mail)* >> /** >> * input >> */ >> val logs = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", BROKER_SERVER) >> .option("subscribe", TOPIC) >> .option("startingOffset", "latest") >> .load() >> >> /** >> * process >> */ >> val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] >> >> val events = logValues >> .map(parseFunction) >> .select( >> $"_1".alias("date").cast("timestamp"), >> $"_2".alias("uuid").cast("string") >> ) >> >> val results = events >> .withWatermark("date", "1 day") >> .dropDuplicates("uuid", "date") >> .groupBy($"date") >> .count() >> >> /** >> * output >> */ >> val query = results >> .writeStream >> .outputMode("update") >> .format("console") >> .option("truncate", "false") >> .trigger(Trigger.ProcessingTime("1 seconds")) >> .start() >> >> query.awaitTermination() >> >> *and I use play json to parse input logs from kafka ,the parse function >> is like* >> >> def parseFunction(str: String): (Long, String) = { >> val json = Json.parse(str) >> val timestamp = (json \ "time").get.toString().toLong >> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 >> val uuid = (json \ "uuid").get.toString() >> (date, uuid) >> } >> >> and the java heap space is like (I've increase the executor memory to >> 15g): >> >> [image: image.png] >> Michael Armbrust 于2017年9月13日周三 上午2:23写道: >> >>> Can you show the full query you are running? >>> >>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 wrote: >>> Hi, I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks. >>> >>> >
Re: [SS] Any way to optimize memory consumption of SS?
How many UUIDs do you expect to have in a day? That is likely where all the memory is being used. Does it work without that? On Tue, Sep 12, 2017 at 8:42 PM, 张万新wrote: > *Yes, my code is shown below(I also post my code in another mail)* > /** > * input > */ > val logs = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", BROKER_SERVER) > .option("subscribe", TOPIC) > .option("startingOffset", "latest") > .load() > > /** > * process > */ > val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] > > val events = logValues > .map(parseFunction) > .select( > $"_1".alias("date").cast("timestamp"), > $"_2".alias("uuid").cast("string") > ) > > val results = events > .withWatermark("date", "1 day") > .dropDuplicates("uuid", "date") > .groupBy($"date") > .count() > > /** > * output > */ > val query = results > .writeStream > .outputMode("update") > .format("console") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("1 seconds")) > .start() > > query.awaitTermination() > > *and I use play json to parse input logs from kafka ,the parse function is > like* > > def parseFunction(str: String): (Long, String) = { > val json = Json.parse(str) > val timestamp = (json \ "time").get.toString().toLong > val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 > val uuid = (json \ "uuid").get.toString() > (date, uuid) > } > > and the java heap space is like (I've increase the executor memory to 15g): > > [image: image.png] > Michael Armbrust 于2017年9月13日周三 上午2:23写道: > >> Can you show the full query you are running? >> >> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 wrote: >> >>> Hi, >>> >>> I'm using structured streaming to count unique visits of our website. I >>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g >>> memory to 4 cores * 10g memory for each executor, but there are frequent >>> full gc, and once the count raises to about more than 4.5 millions the >>> application will be blocked and finally crash in OOM. It's kind of >>> unreasonable. So is there any suggestion to optimize the memory consumption >>> of SS? Thanks. >>> >> >>
Re: [SS] Any way to optimize memory consumption of SS?
*Yes, my code is shown below(I also post my code in another mail)* /** * input */ val logs = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", BROKER_SERVER) .option("subscribe", TOPIC) .option("startingOffset", "latest") .load() /** * process */ val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] val events = logValues .map(parseFunction) .select( $"_1".alias("date").cast("timestamp"), $"_2".alias("uuid").cast("string") ) val results = events .withWatermark("date", "1 day") .dropDuplicates("uuid", "date") .groupBy($"date") .count() /** * output */ val query = results .writeStream .outputMode("update") .format("console") .option("truncate", "false") .trigger(Trigger.ProcessingTime("1 seconds")) .start() query.awaitTermination() *and I use play json to parse input logs from kafka ,the parse function is like* def parseFunction(str: String): (Long, String) = { val json = Json.parse(str) val timestamp = (json \ "time").get.toString().toLong val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 val uuid = (json \ "uuid").get.toString() (date, uuid) } and the java heap space is like (I've increase the executor memory to 15g): [image: image.png] Michael Armbrust于2017年9月13日周三 上午2:23写道: > Can you show the full query you are running? > > On Tue, Sep 12, 2017 at 10:11 AM, 张万新 wrote: > >> Hi, >> >> I'm using structured streaming to count unique visits of our website. I >> use spark on yarn mode with 4 executor instances and from 2 cores * 5g >> memory to 4 cores * 10g memory for each executor, but there are frequent >> full gc, and once the count raises to about more than 4.5 millions the >> application will be blocked and finally crash in OOM. It's kind of >> unreasonable. So is there any suggestion to optimize the memory consumption >> of SS? Thanks. >> > >
Re: [SS] Any way to optimize memory consumption of SS?
Can you show the full query you are running? On Tue, Sep 12, 2017 at 10:11 AM, 张万新wrote: > Hi, > > I'm using structured streaming to count unique visits of our website. I > use spark on yarn mode with 4 executor instances and from 2 cores * 5g > memory to 4 cores * 10g memory for each executor, but there are frequent > full gc, and once the count raises to about more than 4.5 millions the > application will be blocked and finally crash in OOM. It's kind of > unreasonable. So is there any suggestion to optimize the memory consumption > of SS? Thanks. >
[SS] Any way to optimize memory consumption of SS?
Hi, I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks.