Hi everyone, I have a huge dataframe with 1 billion rows and each row is a nested list. That being said, I want to train some ML models on this df but due to the huge size, I get out memory error on one of my nodes when I run fit function.
currently, my configuration is: 144 cores, 16 cores for each of the 8 nodes. 100gb of ram for each slave and 100gb of ram for the driver. I set the maxResultSize to be 20gb. Do you have any suggestion so far? I can think of splitting the data to multiple dataframes and then training the model on each individually but besides the longer runtime, I learned that fit function overwrites the previous model each time I call it. Isn't there a way to get the fit function to train the new model with regard to the previously trained model? Thanks On Sun, Aug 6, 2017 at 11:04 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Marco, > > thanks a ton, I will surely use those alternatives. > > > Regards, > Gourav Sengupta > > On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni <mmistr...@gmail.com> > wrote: > >> Sengupta >> further to this, if you try the following notebook in databricks cloud, >> it will read a .csv file , write to a parquet file and read it again (just >> to count the number of rows stored) >> Please note that the path to the csv file might differ for you..... >> So, what you will need todo is >> 1 - create an account to community.cloud.databricks.com >> 2 - upload the .csv file onto the Data of your databricks private cluster >> 3 - run the script. that will store the data on the distrubuted >> filesystem of the databricks cloudn (dbfs) >> >> It's worth investing in this free databricks cloud as it can create a >> cluster for you with minimal effort, and it's a very easy way to test your >> spark scripts on a real cluster >> >> hope this helps >> kr >> >> ################################## >> from pyspark.sql import SQLContext >> >> from random import randint >> from time import sleep >> from pyspark.sql.session import SparkSession >> import logging >> logger = logging.getLogger(__name__) >> logger.setLevel(logging.INFO) >> ch = logging.StreamHandler() >> logger.addHandler(ch) >> >> >> import sys >> >> def read_parquet_file(parquetFileName): >> logger.info('Reading now the parquet files we just created...:%s', >> parquetFileName) >> parquet_data = sqlContext.read.parquet(parquetFileName) >> logger.info('Parquet file has %s', parquet_data.count()) >> >> def dataprocessing(filePath, count, sqlContext): >> logger.info( 'Iter count is:%s' , count) >> if count == 0: >> print 'exiting' >> else: >> df_traffic_tmp = sqlContext.read.format("csv"). >> option("header",'true').load(filePath) >> logger.info( '#############################DataSet has:%s' , >> df_traffic_tmp.count()) >> logger.info('WRting to a parquet file') >> parquetFileName = "dbfs:/myParquetDf2.parquet" >> df_traffic_tmp.write.parquet(parquetFileName) >> sleepInterval = randint(10,100) >> logger.info( '#############################Sleeping for %s' , >> sleepInterval) >> sleep(sleepInterval) >> read_parquet_file(parquetFileName) >> dataprocessing(filePath, count-1, sqlContext) >> >> filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This >> path might differ for you >> iterations = 1 >> logger.info('----------------------') >> logger.info('Filename:%s', filename) >> logger.info('Iterations:%s', iterations ) >> logger.info('----------------------') >> >> logger.info ('Initializing sqlContext') >> logger.info( '........Starting spark..........Loading from%s for %s >> iterations' , filename, iterations) >> logger.info( 'Starting up....') >> sc = SparkSession.builder.appName("Data Processsing").getOrCreate() >> logger.info ('Initializing sqlContext') >> sqlContext = SQLContext(sc) >> dataprocessing(filename, iterations, sqlContext) >> logger.info('Out of here..') >> ###################################### >> >> >> On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mmistr...@gmail.com> >> wrote: >> >>> Uh believe me there are lots of ppl on this list who will send u code >>> snippets if u ask... 😀 >>> >>> Yes that is what Steve pointed out, suggesting also that for that simple >>> exercise you should perform all operations on a spark standalone instead >>> (or alt. Use an nfs on the cluster) >>> I'd agree with his suggestion.... >>> I suggest u another alternative: >>> https://community.cloud.databricks.com/ >>> >>> That's a ready made cluster and you can run your spark app as well store >>> data on the cluster (well I haven't tried myself but I assume it's >>> possible). Try that out... I will try ur script there as I have an >>> account there (though I guess I'll get there before me.....) >>> >>> Try that out and let me know if u get stuck.... >>> Kr >>> >>> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com> >>> wrote: >>> >>>> Hi Marco, >>>> >>>> For the first time in several years FOR THE VERY FIRST TIME. I am >>>> seeing someone actually executing code and providing response. It feel >>>> wonderful that at least someone considered to respond back by executing >>>> code and just did not filter out each and every technical details to brood >>>> only on my superb social skills, while claiming the reason for ignoring >>>> technical details is that it elementary. I think that Steve also is the >>>> first person who could answer the WHY of an elementary question instead of >>>> saying that is how it is and pointed out to the correct documentation. >>>> >>>> That code works fantastically. But the problem which I have tried to >>>> find out is while writing out the data and not reading it. >>>> >>>> >>>> So if you see try to read the data from the same folder which has the >>>> same file across all the nodes then it will work fine. In fact that is what >>>> should work. >>>> >>>> What does not work is that if you try to write back the file and then >>>> read it once again from the location you have written that is when the >>>> issue starts happening. >>>> >>>> Therefore if in my code you were to save the pandas dataframe as a CSV >>>> file and then read it then you will find the following observations: >>>> >>>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> --------------------------- >>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4), >>>> columns=list('ABCD')) >>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv", >>>> header=True, sep=",", index=0) >>>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa >>>> rkdata/testdir/") >>>> testdf.cache() >>>> testdf.count() >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> --------------------------- >>>> >>>> >>>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN >>>> WHICH THE DATA DOES NOT EXISTS >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> --------------------------- >>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4), >>>> columns=list('ABCD')) >>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv", >>>> header=True, sep=",", index=0) >>>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/sp >>>> ark/sparkdata/testdir/") >>>> testdf.cache() >>>> testdf.count() >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> ------------------------------------------------------------ >>>> --------------------------- >>>> >>>> >>>> if you execute my code then also you will surprisingly see that the >>>> writes in the nodes which is not the master node does not complete moving >>>> the files from the _temporary folder to the main one. >>>> >>>> >>>> Regards, >>>> Gourav Sengupta >>>> >>>> >>>> >>>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com> >>>> wrote: >>>> >>>>> Hello >>>>> please have a look at this. it'sa simple script that just read a >>>>> dataframe for n time, sleeping at random interval. i used it to test >>>>> memory >>>>> issues that another user was experiencing on a spark cluster >>>>> >>>>> you should run it like this e.g >>>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> >>>>> <num of iterations> >>>>> >>>>> i ran it on the cluster like this >>>>> >>>>> ./spark-submit --master spark://ec2-54-218-113-119.us- >>>>> west-2.compute.amazonaws.com:7077 >>>>> /root/pyscripts/dataprocessing_Sample-2.py >>>>> file:///root/pyscripts/tree_addhealth.csv >>>>> >>>>> hth, ping me back if you have issues >>>>> i do agree with Steve's comments.... if you want to test your spark >>>>> script s just for playing, do it on a standaone server on your localhost. >>>>> Moving to a c luster is just a matter of deploying your script and mke >>>>> sure >>>>> you have a common place where to read and store the data..... SysAdmin >>>>> should give you this when they setup the cluster... >>>>> >>>>> kr >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta < >>>>> gourav.sengu...@gmail.com> wrote: >>>>> >>>>>> Hi Marco, >>>>>> >>>>>> I am sincerely obliged for your kind time and response. Can you >>>>>> please try the solution that you have so kindly suggested? >>>>>> >>>>>> It will be a lot of help if you could kindly execute the code that I >>>>>> have given. I dont think that anyone has yet. >>>>>> >>>>>> There are lots of fine responses to my question here, but if you read >>>>>> the last response from Simon, it comes the closest to being >>>>>> satisfactory. I >>>>>> am sure even he did not execute the code, but at least he came quite >>>>>> close >>>>>> to understanding what the problem is. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Gourav Sengupta >>>>>> >>>>>> >>>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistr...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello >>>>>>> my 2 cents here, hope it helps >>>>>>> If you want to just to play around with Spark, i'd leave Hadoop out, >>>>>>> it's an unnecessary dependency that you dont need for just running a >>>>>>> python >>>>>>> script >>>>>>> Instead do the following: >>>>>>> - got to the root of our master / slave node. create a directory >>>>>>> /root/pyscripts >>>>>>> - place your csv file there as well as the python script >>>>>>> - run the script to replicate the whole directory across the >>>>>>> cluster (i believe it's called copy-script.sh) >>>>>>> - then run your spark-submit , it will be something lke >>>>>>> ./spark-submit /root/pyscripts/mysparkscripts.py >>>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master >>>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077 >>>>>>> - in your python script, as part of your processing, write the >>>>>>> parquet file in directory /root/pyscripts >>>>>>> >>>>>>> If you have an AWS account and you are versatile with that - you >>>>>>> need to setup bucket permissions etc - , you can just >>>>>>> - store your file in one of your S3 bucket >>>>>>> - create an EMR cluster >>>>>>> - connect to master or slave >>>>>>> - run your scritp that reads from the s3 bucket and write to the >>>>>>> same s3 bucket >>>>>>> >>>>>>> >>>>>>> Feel free to mail me privately, i have a working script i have used >>>>>>> to test some code on spark standalone cluster >>>>>>> hth >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta < >>>>>>> gourav.sengu...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Steve, >>>>>>>> >>>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING. >>>>>>>> >>>>>>>> I am now going through the documentation ( >>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP >>>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t >>>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much >>>>>>>> much more sense now. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Gourav Sengupta >>>>>>>> >>>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran < >>>>>>>> ste...@hortonworks.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta < >>>>>>>>> gourav.sengu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Hi Steve, >>>>>>>>> >>>>>>>>> I have written a sincere note of apology to everyone in a separate >>>>>>>>> email. I sincerely request your kind forgiveness before hand if >>>>>>>>> anything >>>>>>>>> does sound impolite in my emails, in advance. >>>>>>>>> >>>>>>>>> Let me first start by thanking you. >>>>>>>>> >>>>>>>>> I know it looks like I formed all my opinion based on that >>>>>>>>> document, but that is not the case at all. If you or anyone tries to >>>>>>>>> execute the code that I have given then they will see what I mean. >>>>>>>>> Code >>>>>>>>> speaks louder and better than words for me. >>>>>>>>> >>>>>>>>> So I am not saying you are wrong. I am asking verify and expecting >>>>>>>>> someone will be able to correct a set of understanding that a moron >>>>>>>>> like >>>>>>>>> me has gained after long hours of not having anything better to do. >>>>>>>>> >>>>>>>>> >>>>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in >>>>>>>>> HDFS with replication 2 and there is a HADOOP cluster of three nodes. >>>>>>>>> All >>>>>>>>> these nodes have SPARK workers (executors) running in them. Both are >>>>>>>>> stored in the following way: >>>>>>>>> ----------------------------------------------------- >>>>>>>>> | SYSTEM 1 | SYSTEM 2 | SYSTEM 3 | >>>>>>>>> | (worker1) | (worker2) | (worker3) | >>>>>>>>> | (master) | | | >>>>>>>>> ----------------------------------------------------- >>>>>>>>> | file1.csv | | file1.csv | >>>>>>>>> ----------------------------------------------------- >>>>>>>>> | | file2.csv | file2.csv | >>>>>>>>> ----------------------------------------------------- >>>>>>>>> | file3.csv | file3.csv | | >>>>>>>>> ----------------------------------------------------- >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN: >>>>>>>>> HDFS replication does not store the same file in all the nodes in >>>>>>>>> the cluster. So if I have three nodes and the replication is two then >>>>>>>>> the >>>>>>>>> same file will be stored physically in two nodes in the cluster. Does >>>>>>>>> that >>>>>>>>> sound right? >>>>>>>>> >>>>>>>>> >>>>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file >>>>>>>>> is > 128 then it will be broken up into blocks >>>>>>>>> >>>>>>>>> file1.cvs -> [block0001, block002, block0003] >>>>>>>>> >>>>>>>>> and each block will be replicated. With replication = 2 there will >>>>>>>>> be two copies of each block, but the file itself can span > 2 hosts. >>>>>>>>> >>>>>>>>> >>>>>>>>> ASSUMPTION (STEVE PLEASE CLARIFY THIS): >>>>>>>>> If SPARK is trying to process to the records then I am expecting >>>>>>>>> that WORKER2 should not be processing file1.csv, and similary WORKER 1 >>>>>>>>> should not be processing file2.csv and WORKER3 should not be >>>>>>>>> processing >>>>>>>>> file3.csv. Because in case WORKER2 was trying to process file1.csv >>>>>>>>> then it >>>>>>>>> will actually causing network transmission of the file unnecessarily. >>>>>>>>> >>>>>>>>> >>>>>>>>> Spark prefers to schedule work locally, so as to save on network >>>>>>>>> traffic, but it schedules for execution time over waiting for workers >>>>>>>>> free >>>>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there >>>>>>>>> is only >>>>>>>>> a free thread on node 1, then node 1 gets the work >>>>>>>>> >>>>>>>>> There's details on whether/how work across blocks takes place >>>>>>>>> which I'm avoiding. For now know those formats which are "splittable" >>>>>>>>> will >>>>>>>>> have work scheduled by block. If you use Parquet/ORC/avro for your >>>>>>>>> data and >>>>>>>>> compress with snappy, it will be split. This gives you maximum >>>>>>>>> performance >>>>>>>>> as >1 thread can work on different blocks. That is, if file1 is split >>>>>>>>> into >>>>>>>>> three blocks, three worker threads can process it. >>>>>>>>> >>>>>>>>> >>>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE >>>>>>>>> CLARIFY THIS): >>>>>>>>> if WORKER 2 is not processing file1.csv then how does it matter >>>>>>>>> whether the file is there or not at all in the system? Should not >>>>>>>>> SPARK >>>>>>>>> just ask the workers to process the files which are avialable in the >>>>>>>>> worker >>>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available >>>>>>>>> then >>>>>>>>> file2.csv will not be processed at all. >>>>>>>>> >>>>>>>>> >>>>>>>>> locality is best-effort, not guaranteed. >>>>>>>>> >>>>>>>>> >>>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE >>>>>>>>> SHOULD BE EXECUTED (Its been pointed out that I am learning SPARK, >>>>>>>>> and even >>>>>>>>> I did not take more than 13 mins to set up the cluster and run the >>>>>>>>> code). >>>>>>>>> >>>>>>>>> Once you execute the code then you will find that: >>>>>>>>> 1. if the path starts with file:/// while reading back then >>>>>>>>> there is no error reported, but the number of records reported back >>>>>>>>> are >>>>>>>>> only those records in the worker which also has the server. >>>>>>>>> 2. also you will notice that once you cache the file before >>>>>>>>> writing the partitions are ditributed nicely across the workers, and >>>>>>>>> while >>>>>>>>> writing back, the dataframe partitions does write properly to the >>>>>>>>> worker >>>>>>>>> node in the Master, but the workers in the other system have the files >>>>>>>>> written in _temporary folder which does not get copied back to the >>>>>>>>> main >>>>>>>>> folder. Inspite of this the job is not reported as failed in SPARK. >>>>>>>>> >>>>>>>>> >>>>>>>>> This gets into the "commit protocol". You don't want to know all >>>>>>>>> the dirty details (*) but essentially its this >>>>>>>>> >>>>>>>>> 1. Every worker writes its output to a directory under the >>>>>>>>> destination directory, something like '$dest/_temporary/$appAtt >>>>>>>>> emptId/_temporary/$taskAttemptID' >>>>>>>>> 2. it is the spark driver which "commits" the job by moving the >>>>>>>>> output from the individual workers from the temporary directories into >>>>>>>>> $dest, then deleting $dest/_temporary >>>>>>>>> 3. For which it needs to be able to list all the output in >>>>>>>>> $dest/_temporary >>>>>>>>> >>>>>>>>> In your case, only the output on the same node of the driver is >>>>>>>>> being committed, because only those files can be listed and moved. The >>>>>>>>> output on the other nodes isn't seen, so isn't committed, nor cleaned >>>>>>>>> up. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Now in my own world, if I see, the following things are happening, >>>>>>>>> something is going wrong (with me): >>>>>>>>> 1. SPARK transfers files from different systems to process, >>>>>>>>> instead of processing them locally (I do not have code to prove this, >>>>>>>>> and >>>>>>>>> therefore its just an assumption) >>>>>>>>> 2. SPARK cannot determine when the writes are failing in >>>>>>>>> standalone clusters workers and reports success (code is there for >>>>>>>>> this) >>>>>>>>> 3. SPARK reports back number of records in the worker running in >>>>>>>>> the master node when count() is given without reporting an error while >>>>>>>>> using file:/// and reports an error when I mention the path >>>>>>>>> without file:/// (for SPARK 2.1.x onwards, code is there for this) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> s everyone's been saying, file:// requires a shared filestore, >>>>>>>>> with uniform paths everywhere. That's needed to list the files to >>>>>>>>> process, >>>>>>>>> read the files in the workers and commit the final output. NFS >>>>>>>>> cross-mounting is the simplest way to do this, especially as for three >>>>>>>>> nodes HDFS is overkill: more services to keep running, no real fault >>>>>>>>> tolerance. Export a directory tree from one of the servers, give the >>>>>>>>> rest >>>>>>>>> access to it, don't worry about bandwidth use as the shared disk >>>>>>>>> itself >>>>>>>>> will become the bottleneck >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I very sincerely hope with your genuine help the bar of language >>>>>>>>> and social skills will be lowered for me. And everyone will find a >>>>>>>>> way to >>>>>>>>> excuse me and not qualify this email as a means to measure my >>>>>>>>> extremely >>>>>>>>> versatile and amazingly vivid social skills. It will be a lot of help >>>>>>>>> to >>>>>>>>> just focus on the facts related to machines, data, error and (the >>>>>>>>> language >>>>>>>>> that I somehow understand better) code. >>>>>>>>> >>>>>>>>> >>>>>>>>> My sincere apologies once again, as I am 100% sure that I did not >>>>>>>>> meet the required social and language skills. >>>>>>>>> >>>>>>>>> Thanks a ton once again for your kindness, patience and >>>>>>>>> understanding. >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Gourav Sengupta >>>>>>>>> >>>>>>>>> >>>>>>>>> * for the curious, the details of the v1 and v2 commit protocols >>>>>>>>> are >>>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP- >>>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to >>>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md >>>>>>>>> >>>>>>>>> Like I said: you don't want to know the details, and you really >>>>>>>>> don't want to step through Hadoop's FileOutputCommitter to see what's >>>>>>>>> going >>>>>>>>> on. The Spark side is much easier to follow. >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >