It looks good, are you sure it even starts? the problem I see is that you
send a copy of the model from the driver for every task. Try broadcasting
the model instead. I'm not sure if that resolves it but would be a good
practice.

On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <xavier.gervi...@datapta.com>
wrote:

> Hi Team,
> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration>
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
>     ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>     words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>     **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
>     **#if the tweet is empty return None*
>     *if** words == **""**:
>         **return* *None*
>     *#else: applying the NER model (Spacy en_core_web_sm)**
>     entities = nerModel(words)
>
>     **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>     *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the average sentiment of the entity and the number of appearances.
>
> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
> flattenedNER.registerTempTable(**"df"**)
>
>
> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
> count(col) as count FROM df GROUP BY col"**
> finalDF = spark.sql(querySelect)
>
> query = 
> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>
>
> The resulting DF is processed with a function that separates each column
> in a list and prints it.
>
> *def* *processBatch**(**df**,* *epoch_id**):*    *entities* *=* 
> *[**str**(**t**.**entity**)* *for* *t* *in* 
> *df**.**select**(**"entity"**).**collect**()]*
>     *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
> *df**.**select**(**"sentiment"**).**collect**()]*
>     *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
> *in* *df**.**select**(**"count"**).**collect**()]*
>
> *    print(**entities**,* *sentiments**,* *counts**)*
>
>
> At first I tried with other NER models from Flair they have the same
> effect, after printing the first batch memory use starts increasing until
> it fails and stops the execution because of the memory error. When applying
> a "simple" function instead of the NER model, such as *return
> words.split()* on the UDF there's no such error so the data ingested
> should not be what's causing the overload but the model.
>
> Is there a way to prevent the excessive RAM consumption? Why is there only
> the driver executor and no other executors are generated? How could I
> prevent it from collapsing when applying the NER model?
>
> Thanks in advance!
>
>

Reply via email to