Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread 张万新
There is expected to be about 5 million UUIDs in a day. I need to use this
field to drop duplicate records and count number. If I simply count numbers
without using dropDuplicates it only occupies less than 1g memory. I
believe most of the memory is occupied by the state store for keeping the
state of dropDuplicates. But I cannot find a way to alleviate the problem.

Michael Armbrust 于2017年9月15日周五 上午3:35写道:

> How many UUIDs do you expect to have in a day?  That is likely where all
> the memory is being used.  Does it work without that?
>
> On Tue, Sep 12, 2017 at 8:42 PM, 张万新  wrote:
>
>> *Yes, my code is shown below(I also post my code in another mail)*
>> /**
>> * input
>> */
>>   val logs = spark
>> .readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>> .option("subscribe", TOPIC)
>> .option("startingOffset", "latest")
>> .load()
>>
>>   /**
>> * process
>> */
>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>>   val events = logValues
>> .map(parseFunction)
>> .select(
>>   $"_1".alias("date").cast("timestamp"),
>>   $"_2".alias("uuid").cast("string")
>> )
>>
>>   val results = events
>> .withWatermark("date", "1 day")
>> .dropDuplicates("uuid", "date")
>> .groupBy($"date")
>> .count()
>>
>>   /**
>> * output
>> */
>>   val query = results
>> .writeStream
>> .outputMode("update")
>> .format("console")
>> .option("truncate", "false")
>> .trigger(Trigger.ProcessingTime("1 seconds"))
>> .start()
>>
>>   query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>>   def parseFunction(str: String): (Long, String) = {
>> val json = Json.parse(str)
>> val timestamp = (json \ "time").get.toString().toLong
>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>> val uuid = (json \ "uuid").get.toString()
>> (date, uuid)
>>   }
>>
>> and the java heap space is like (I've increase the executor memory to
>> 15g):
>>
>> [image: image.png]
>> Michael Armbrust 于2017年9月13日周三 上午2:23写道:
>>
>>> Can you show the full query you are running?
>>>
>>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:
>>>
 Hi,

 I'm using structured streaming to count unique visits of our website. I
 use spark on yarn mode with 4 executor instances and from 2 cores * 5g
 memory to 4 cores * 10g memory for each executor, but there are frequent
 full gc, and once the count raises to about more than 4.5 millions the
 application will be blocked and finally crash in OOM. It's kind of
 unreasonable. So is there any suggestion to optimize the memory consumption
 of SS? Thanks.

>>>
>>>
>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread Michael Armbrust
How many UUIDs do you expect to have in a day?  That is likely where all
the memory is being used.  Does it work without that?

On Tue, Sep 12, 2017 at 8:42 PM, 张万新  wrote:

> *Yes, my code is shown below(I also post my code in another mail)*
> /**
> * input
> */
>   val logs = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", BROKER_SERVER)
> .option("subscribe", TOPIC)
> .option("startingOffset", "latest")
> .load()
>
>   /**
> * process
> */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
> .map(parseFunction)
> .select(
>   $"_1".alias("date").cast("timestamp"),
>   $"_2".alias("uuid").cast("string")
> )
>
>   val results = events
> .withWatermark("date", "1 day")
> .dropDuplicates("uuid", "date")
> .groupBy($"date")
> .count()
>
>   /**
> * output
> */
>   val query = results
> .writeStream
> .outputMode("update")
> .format("console")
> .option("truncate", "false")
> .trigger(Trigger.ProcessingTime("1 seconds"))
> .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
> val json = Json.parse(str)
> val timestamp = (json \ "time").get.toString().toLong
> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
> val uuid = (json \ "uuid").get.toString()
> (date, uuid)
>   }
>
> and the java heap space is like (I've increase the executor memory to 15g):
>
> [image: image.png]
> Michael Armbrust 于2017年9月13日周三 上午2:23写道:
>
>> Can you show the full query you are running?
>>
>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:
>>
>>> Hi,
>>>
>>> I'm using structured streaming to count unique visits of our website. I
>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>>> memory to 4 cores * 10g memory for each executor, but there are frequent
>>> full gc, and once the count raises to about more than 4.5 millions the
>>> application will be blocked and finally crash in OOM. It's kind of
>>> unreasonable. So is there any suggestion to optimize the memory consumption
>>> of SS? Thanks.
>>>
>>
>>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread 张万新
*Yes, my code is shown below(I also post my code in another mail)*
/**
* input
*/
  val logs = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BROKER_SERVER)
.option("subscribe", TOPIC)
.option("startingOffset", "latest")
.load()

  /**
* process
*/
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
.map(parseFunction)
.select(
  $"_1".alias("date").cast("timestamp"),
  $"_2".alias("uuid").cast("string")
)

  val results = events
.withWatermark("date", "1 day")
.dropDuplicates("uuid", "date")
.groupBy($"date")
.count()

  /**
* output
*/
  val query = results
.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()

  query.awaitTermination()

*and I use play json to parse input logs from kafka ,the parse function is
like*

  def parseFunction(str: String): (Long, String) = {
val json = Json.parse(str)
val timestamp = (json \ "time").get.toString().toLong
val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
val uuid = (json \ "uuid").get.toString()
(date, uuid)
  }

and the java heap space is like (I've increase the executor memory to 15g):

[image: image.png]
Michael Armbrust 于2017年9月13日周三 上午2:23写道:

> Can you show the full query you are running?
>
> On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:
>
>> Hi,
>>
>> I'm using structured streaming to count unique visits of our website. I
>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>> memory to 4 cores * 10g memory for each executor, but there are frequent
>> full gc, and once the count raises to about more than 4.5 millions the
>> application will be blocked and finally crash in OOM. It's kind of
>> unreasonable. So is there any suggestion to optimize the memory consumption
>> of SS? Thanks.
>>
>
>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread Michael Armbrust
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:

> Hi,
>
> I'm using structured streaming to count unique visits of our website. I
> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
> memory to 4 cores * 10g memory for each executor, but there are frequent
> full gc, and once the count raises to about more than 4.5 millions the
> application will be blocked and finally crash in OOM. It's kind of
> unreasonable. So is there any suggestion to optimize the memory consumption
> of SS? Thanks.
>


[SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread 张万新
Hi,

I'm using structured streaming to count unique visits of our website. I use
spark on yarn mode with 4 executor instances and from 2 cores * 5g memory
to 4 cores * 10g memory for each executor, but there are frequent full gc,
and once the count raises to about more than 4.5 millions the application
will be blocked and finally crash in OOM. It's kind of unreasonable. So is
there any suggestion to optimize the memory consumption of SS? Thanks.