There is expected to be about 5 million UUIDs in a day. I need to use this
field to drop duplicate records and count number. If I simply count numbers
without using dropDuplicates it only occupies less than 1g memory. I
believe most of the memory is occupied by the state store for keeping the
state of dropDuplicates. But I cannot find a way to alleviate the problem.

Michael Armbrust <mich...@databricks.com>于2017年9月15日周五 上午3:35写道:

> How many UUIDs do you expect to have in a day?  That is likely where all
> the memory is being used.  Does it work without that?
>
> On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>
>> *Yes, my code is shown below(I also post my code in another mail)*
>> /**
>>     * input
>>     */
>>   val logs = spark
>>     .readStream
>>     .format("kafka")
>>     .option("kafka.bootstrap.servers", BROKER_SERVER)
>>     .option("subscribe", TOPIC)
>>     .option("startingOffset", "latest")
>>     .load()
>>
>>   /**
>>     * process
>>     */
>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>>   val events = logValues
>>     .map(parseFunction)
>>     .select(
>>       $"_1".alias("date").cast("timestamp"),
>>       $"_2".alias("uuid").cast("string")
>>     )
>>
>>   val results = events
>>     .withWatermark("date", "1 day")
>>     .dropDuplicates("uuid", "date")
>>     .groupBy($"date")
>>     .count()
>>
>>   /**
>>     * output
>>     */
>>   val query = results
>>     .writeStream
>>     .outputMode("update")
>>     .format("console")
>>     .option("truncate", "false")
>>     .trigger(Trigger.ProcessingTime("1 seconds"))
>>     .start()
>>
>>   query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>>   def parseFunction(str: String): (Long, String) = {
>>     val json = Json.parse(str)
>>     val timestamp = (json \ "time").get.toString().toLong
>>     val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>>     val uuid = (json \ "uuid").get.toString()
>>     (date, uuid)
>>   }
>>
>> and the java heap space is like (I've increase the executor memory to
>> 15g):
>>
>> [image: image.png]
>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道:
>>
>>> Can you show the full query you are running?
>>>
>>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm using structured streaming to count unique visits of our website. I
>>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>>>> memory to 4 cores * 10g memory for each executor, but there are frequent
>>>> full gc, and once the count raises to about more than 4.5 millions the
>>>> application will be blocked and finally crash in OOM. It's kind of
>>>> unreasonable. So is there any suggestion to optimize the memory consumption
>>>> of SS? Thanks.
>>>>
>>>
>>>
>

Reply via email to