I have no context on ML, but your "streaming" query exposes the possibility
of memory issues.

*flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
Since this is a streaming query, grouped aggregation incurs state store,
and since you use the output mode as complete, state store will grow over
time which will dominate the memory in executors.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time


On Tue, Apr 19, 2022 at 4:07 AM Bjørn Jørgensen <bjornjorgen...@gmail.com>
wrote:

> When did SpaCy have support for Spark?
>
> Try Spark NLP <https://nlp.johnsnowlabs.com> it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen <sro...@gmail.com>:
>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>> Hi Team,
>>> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration>
>>>
>>> I'm developing a project that retrieves tweets on a 'host' app, streams
>>> them to Spark and with different operations with DataFrames obtains the
>>> Sentiment of the tweets and their entities applying a Sentiment model and a
>>> NER model respectively.
>>>
>>> The problem I've come across is that when applying the NER model, the
>>> RAM consumption increases until the program stops with a memory error
>>> because there's no memory left to execute. In addition, on SparkUI I've
>>> seen that there's only one executor running, the executor driver, but using
>>> htop on the terminal I see that the 8 cores of the instance are executing
>>> at 100%.
>>>
>>> The SparkSession is only configured to receive the tweets from the
>>> socket that connects with the second program that sends the tweets. The
>>> DataFrame goes through some processing to obtain other properties of the
>>> tweet like its sentiment (which causes no error even with less than 8GB of
>>> RAM) and then the NER is applied.
>>>
>>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>>> **"localhost"**).option(**"port"**,**9008**).load()
>>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>>
>>> **#prior processing of the tweets**
>>> sentDF = other_processing(tweets)
>>>
>>> **#obtaining the column that contains the list of entities from a tweet**
>>> nerDF = ner_classification(sentDF)*
>>>
>>>
>>> This is the code of the functions related to obtaining the NER, the
>>> "main call" and the UDF function.
>>>
>>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>>
>>> **#main call, applies the UDF function to every tweet from the "tweet" 
>>> column**def* *ner_classification**(**words**):
>>>     ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>>>     words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>>>     **return** words
>>>
>>> **#udf function**def* *obtain_ner_udf**(**words**):
>>>     **#if the tweet is empty return None*
>>>     *if** words == **""**:
>>>         **return* *None*
>>>     *#else: applying the NER model (Spacy en_core_web_sm)**
>>>     entities = nerModel(words)
>>>
>>>     **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>>>     *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>>> entities.ents ]*
>>>
>>>
>>>
>>> And lastly I map each entity with the sentiment from its tweet and
>>> obtain the average sentiment of the entity and the number of appearances.
>>>
>>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>>> flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
>>> The resulting DF is processed with a function that separates each column
>>> in a list and prints it.
>>>
>>> *def* *processBatch**(**df**,* *epoch_id**):*    *entities* *=* 
>>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>>> *df**.**select**(**"entity"**).**collect**()]*
>>>     *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>>> *df**.**select**(**"sentiment"**).**collect**()]*
>>>     *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
>>> *in* *df**.**select**(**"count"**).**collect**()]*
>>>
>>> *    print(**entities**,* *sentiments**,* *counts**)*
>>>
>>>
>>> At first I tried with other NER models from Flair they have the same
>>> effect, after printing the first batch memory use starts increasing until
>>> it fails and stops the execution because of the memory error. When applying
>>> a "simple" function instead of the NER model, such as *return
>>> words.split()* on the UDF there's no such error so the data ingested
>>> should not be what's causing the overload but the model.
>>>
>>> Is there a way to prevent the excessive RAM consumption? Why is there
>>> only the driver executor and no other executors are generated? How could I
>>> prevent it from collapsing when applying the NER model?
>>>
>>> Thanks in advance!
>>>
>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>

Reply via email to