I have no context on ML, but your "streaming" query exposes the possibility of memory issues.
*flattenedNER.registerTempTable(**"df"**) >>> >>> >>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, >>> count(col) as count FROM df GROUP BY col"** >>> finalDF = spark.sql(querySelect) >>> >>> query = >>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()* >>> >>> Since this is a streaming query, grouped aggregation incurs state store, and since you use the output mode as complete, state store will grow over time which will dominate the memory in executors. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time On Tue, Apr 19, 2022 at 4:07 AM Bjørn Jørgensen <bjornjorgen...@gmail.com> wrote: > When did SpaCy have support for Spark? > > Try Spark NLP <https://nlp.johnsnowlabs.com> it`s made for spark. They > have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and > they public user guides at > https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59 > > > > > man. 18. apr. 2022 kl. 16:17 skrev Sean Owen <sro...@gmail.com>: > >> It looks good, are you sure it even starts? the problem I see is that you >> send a copy of the model from the driver for every task. Try broadcasting >> the model instead. I'm not sure if that resolves it but would be a good >> practice. >> >> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla < >> xavier.gervi...@datapta.com> wrote: >> >>> Hi Team, >>> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration> >>> >>> I'm developing a project that retrieves tweets on a 'host' app, streams >>> them to Spark and with different operations with DataFrames obtains the >>> Sentiment of the tweets and their entities applying a Sentiment model and a >>> NER model respectively. >>> >>> The problem I've come across is that when applying the NER model, the >>> RAM consumption increases until the program stops with a memory error >>> because there's no memory left to execute. In addition, on SparkUI I've >>> seen that there's only one executor running, the executor driver, but using >>> htop on the terminal I see that the 8 cores of the instance are executing >>> at 100%. >>> >>> The SparkSession is only configured to receive the tweets from the >>> socket that connects with the second program that sends the tweets. The >>> DataFrame goes through some processing to obtain other properties of the >>> tweet like its sentiment (which causes no error even with less than 8GB of >>> RAM) and then the NER is applied. >>> >>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate() >>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, >>> **"localhost"**).option(**"port"**,**9008**).load() >>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**) >>> >>> **#prior processing of the tweets** >>> sentDF = other_processing(tweets) >>> >>> **#obtaining the column that contains the list of entities from a tweet** >>> nerDF = ner_classification(sentDF)* >>> >>> >>> This is the code of the functions related to obtaining the NER, the >>> "main call" and the UDF function. >>> >>> *nerModel = spacy.load(**"en_core_web_sm"**) >>> >>> **#main call, applies the UDF function to every tweet from the "tweet" >>> column**def* *ner_classification**(**words**): >>> ner_list = udf(obtain_ner_udf, ArrayType(StringType())) >>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**)) >>> **return** words >>> >>> **#udf function**def* *obtain_ner_udf**(**words**): >>> **#if the tweet is empty return None* >>> *if** words == **""**: >>> **return* *None* >>> *#else: applying the NER model (Spacy en_core_web_sm)** >>> entities = nerModel(words) >>> >>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]* >>> *return** [ word.text + **'_'** + word.label_ **for** word **in** >>> entities.ents ]* >>> >>> >>> >>> And lastly I map each entity with the sentiment from its tweet and >>> obtain the average sentiment of the entity and the number of appearances. >>> >>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist)) >>> flattenedNER.registerTempTable(**"df"**) >>> >>> >>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, >>> count(col) as count FROM df GROUP BY col"** >>> finalDF = spark.sql(querySelect) >>> >>> query = >>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()* >>> >>> >>> The resulting DF is processed with a function that separates each column >>> in a list and prints it. >>> >>> *def* *processBatch**(**df**,* *epoch_id**):* *entities* *=* >>> *[**str**(**t**.**entity**)* *for* *t* *in* >>> *df**.**select**(**"entity"**).**collect**()]* >>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* >>> *df**.**select**(**"sentiment"**).**collect**()]* >>> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* >>> *in* *df**.**select**(**"count"**).**collect**()]* >>> >>> * print(**entities**,* *sentiments**,* *counts**)* >>> >>> >>> At first I tried with other NER models from Flair they have the same >>> effect, after printing the first batch memory use starts increasing until >>> it fails and stops the execution because of the memory error. When applying >>> a "simple" function instead of the NER model, such as *return >>> words.split()* on the UDF there's no such error so the data ingested >>> should not be what's causing the overload but the model. >>> >>> Is there a way to prevent the excessive RAM consumption? Why is there >>> only the driver executor and no other executors are generated? How could I >>> prevent it from collapsing when applying the NER model? >>> >>> Thanks in advance! >>> >>> > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 >