Uh believe me there are lots of ppl on this list who will send u code snippets if u ask... 😀
Yes that is what Steve pointed out, suggesting also that for that simple exercise you should perform all operations on a spark standalone instead (or alt. Use an nfs on the cluster) I'd agree with his suggestion.... I suggest u another alternative: https://community.cloud.databricks.com/ That's a ready made cluster and you can run your spark app as well store data on the cluster (well I haven't tried myself but I assume it's possible). Try that out... I will try ur script there as I have an account there (though I guess I'll get there before me.....) Try that out and let me know if u get stuck.... Kr On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com> wrote: > Hi Marco, > > For the first time in several years FOR THE VERY FIRST TIME. I am seeing > someone actually executing code and providing response. It feel wonderful > that at least someone considered to respond back by executing code and just > did not filter out each and every technical details to brood only on my > superb social skills, while claiming the reason for ignoring technical > details is that it elementary. I think that Steve also is the first person > who could answer the WHY of an elementary question instead of saying that > is how it is and pointed out to the correct documentation. > > That code works fantastically. But the problem which I have tried to find > out is while writing out the data and not reading it. > > > So if you see try to read the data from the same folder which has the same > file across all the nodes then it will work fine. In fact that is what > should work. > > What does not work is that if you try to write back the file and then read > it once again from the location you have written that is when the issue > starts happening. > > Therefore if in my code you were to save the pandas dataframe as a CSV > file and then read it then you will find the following observations: > > FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES > ------------------------------------------------------------ > ------------------------------------------------------------ > ------------------------------------------------------------ > --------------------------- > pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4), > columns=list('ABCD')) > pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv", > header=True, sep=",", index=0) > testdf = spark.read.load("/Users/gouravsengupta/Development/spark/ > sparkdata/testdir/") > testdf.cache() > testdf.count() > ------------------------------------------------------------ > ------------------------------------------------------------ > ------------------------------------------------------------ > --------------------------- > > > FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH > THE DATA DOES NOT EXISTS > ------------------------------------------------------------ > ------------------------------------------------------------ > ------------------------------------------------------------ > --------------------------- > pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4), > columns=list('ABCD')) > pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv", > header=True, sep=",", index=0) > testdf = spark.read.load("file:///Users/gouravsengupta/ > Development/spark/sparkdata/testdir/") > testdf.cache() > testdf.count() > ------------------------------------------------------------ > ------------------------------------------------------------ > ------------------------------------------------------------ > --------------------------- > > > if you execute my code then also you will surprisingly see that the writes > in the nodes which is not the master node does not complete moving the > files from the _temporary folder to the main one. > > > Regards, > Gourav Sengupta > > > > On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com> > wrote: > >> Hello >> please have a look at this. it'sa simple script that just read a >> dataframe for n time, sleeping at random interval. i used it to test memory >> issues that another user was experiencing on a spark cluster >> >> you should run it like this e.g >> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> <num >> of iterations> >> >> i ran it on the cluster like this >> >> ./spark-submit --master spark://ec2-54-218-113-119.us- >> west-2.compute.amazonaws.com:7077 >> /root/pyscripts/dataprocessing_Sample-2.py >> file:///root/pyscripts/tree_addhealth.csv >> >> hth, ping me back if you have issues >> i do agree with Steve's comments.... if you want to test your spark >> script s just for playing, do it on a standaone server on your localhost. >> Moving to a c luster is just a matter of deploying your script and mke sure >> you have a common place where to read and store the data..... SysAdmin >> should give you this when they setup the cluster... >> >> kr >> >> >> >> >> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi Marco, >>> >>> I am sincerely obliged for your kind time and response. Can you please >>> try the solution that you have so kindly suggested? >>> >>> It will be a lot of help if you could kindly execute the code that I >>> have given. I dont think that anyone has yet. >>> >>> There are lots of fine responses to my question here, but if you read >>> the last response from Simon, it comes the closest to being satisfactory. I >>> am sure even he did not execute the code, but at least he came quite close >>> to understanding what the problem is. >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> >>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistr...@gmail.com> >>> wrote: >>> >>>> Hello >>>> my 2 cents here, hope it helps >>>> If you want to just to play around with Spark, i'd leave Hadoop out, >>>> it's an unnecessary dependency that you dont need for just running a python >>>> script >>>> Instead do the following: >>>> - got to the root of our master / slave node. create a directory >>>> /root/pyscripts >>>> - place your csv file there as well as the python script >>>> - run the script to replicate the whole directory across the cluster >>>> (i believe it's called copy-script.sh) >>>> - then run your spark-submit , it will be something lke >>>> ./spark-submit /root/pyscripts/mysparkscripts.py >>>> file:///root/pyscripts/tree_addhealth.csv 10 --master >>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077 >>>> - in your python script, as part of your processing, write the parquet >>>> file in directory /root/pyscripts >>>> >>>> If you have an AWS account and you are versatile with that - you need >>>> to setup bucket permissions etc - , you can just >>>> - store your file in one of your S3 bucket >>>> - create an EMR cluster >>>> - connect to master or slave >>>> - run your scritp that reads from the s3 bucket and write to the same >>>> s3 bucket >>>> >>>> >>>> Feel free to mail me privately, i have a working script i have used to >>>> test some code on spark standalone cluster >>>> hth >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta < >>>> gourav.sengu...@gmail.com> wrote: >>>> >>>>> Hi Steve, >>>>> >>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING. >>>>> >>>>> I am now going through the documentation ( >>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP >>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t >>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much much >>>>> more sense now. >>>>> >>>>> Regards, >>>>> Gourav Sengupta >>>>> >>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran < >>>>> ste...@hortonworks.com> wrote: >>>>> >>>>>> >>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengu...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hi Steve, >>>>>> >>>>>> I have written a sincere note of apology to everyone in a separate >>>>>> email. I sincerely request your kind forgiveness before hand if anything >>>>>> does sound impolite in my emails, in advance. >>>>>> >>>>>> Let me first start by thanking you. >>>>>> >>>>>> I know it looks like I formed all my opinion based on that document, >>>>>> but that is not the case at all. If you or anyone tries to execute the >>>>>> code >>>>>> that I have given then they will see what I mean. Code speaks louder and >>>>>> better than words for me. >>>>>> >>>>>> So I am not saying you are wrong. I am asking verify and expecting >>>>>> someone will be able to correct a set of understanding that a moron like >>>>>> me has gained after long hours of not having anything better to do. >>>>>> >>>>>> >>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS >>>>>> with replication 2 and there is a HADOOP cluster of three nodes. All >>>>>> these >>>>>> nodes have SPARK workers (executors) running in them. Both are stored in >>>>>> the following way: >>>>>> ----------------------------------------------------- >>>>>> | SYSTEM 1 | SYSTEM 2 | SYSTEM 3 | >>>>>> | (worker1) | (worker2) | (worker3) | >>>>>> | (master) | | | >>>>>> ----------------------------------------------------- >>>>>> | file1.csv | | file1.csv | >>>>>> ----------------------------------------------------- >>>>>> | | file2.csv | file2.csv | >>>>>> ----------------------------------------------------- >>>>>> | file3.csv | file3.csv | | >>>>>> ----------------------------------------------------- >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN: >>>>>> HDFS replication does not store the same file in all the nodes in the >>>>>> cluster. So if I have three nodes and the replication is two then the >>>>>> same >>>>>> file will be stored physically in two nodes in the cluster. Does that >>>>>> sound >>>>>> right? >>>>>> >>>>>> >>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >>>>>> > 128 then it will be broken up into blocks >>>>>> >>>>>> file1.cvs -> [block0001, block002, block0003] >>>>>> >>>>>> and each block will be replicated. With replication = 2 there will be >>>>>> two copies of each block, but the file itself can span > 2 hosts. >>>>>> >>>>>> >>>>>> ASSUMPTION (STEVE PLEASE CLARIFY THIS): >>>>>> If SPARK is trying to process to the records then I am expecting that >>>>>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should >>>>>> not be processing file2.csv and WORKER3 should not be processing >>>>>> file3.csv. >>>>>> Because in case WORKER2 was trying to process file1.csv then it will >>>>>> actually causing network transmission of the file unnecessarily. >>>>>> >>>>>> >>>>>> Spark prefers to schedule work locally, so as to save on network >>>>>> traffic, but it schedules for execution time over waiting for workers >>>>>> free >>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is >>>>>> only >>>>>> a free thread on node 1, then node 1 gets the work >>>>>> >>>>>> There's details on whether/how work across blocks takes place which >>>>>> I'm avoiding. For now know those formats which are "splittable" will have >>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and >>>>>> compress with snappy, it will be split. This gives you maximum >>>>>> performance >>>>>> as >1 thread can work on different blocks. That is, if file1 is split >>>>>> into >>>>>> three blocks, three worker threads can process it. >>>>>> >>>>>> >>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE >>>>>> CLARIFY THIS): >>>>>> if WORKER 2 is not processing file1.csv then how does it matter >>>>>> whether the file is there or not at all in the system? Should not SPARK >>>>>> just ask the workers to process the files which are avialable in the >>>>>> worker >>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then >>>>>> file2.csv will not be processed at all. >>>>>> >>>>>> >>>>>> locality is best-effort, not guaranteed. >>>>>> >>>>>> >>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD >>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I >>>>>> did >>>>>> not take more than 13 mins to set up the cluster and run the code). >>>>>> >>>>>> Once you execute the code then you will find that: >>>>>> 1. if the path starts with file:/// while reading back then there >>>>>> is no error reported, but the number of records reported back are only >>>>>> those records in the worker which also has the server. >>>>>> 2. also you will notice that once you cache the file before writing >>>>>> the partitions are ditributed nicely across the workers, and while >>>>>> writing >>>>>> back, the dataframe partitions does write properly to the worker node in >>>>>> the Master, but the workers in the other system have the files written in >>>>>> _temporary folder which does not get copied back to the main folder. >>>>>> Inspite of this the job is not reported as failed in SPARK. >>>>>> >>>>>> >>>>>> This gets into the "commit protocol". You don't want to know all the >>>>>> dirty details (*) but essentially its this >>>>>> >>>>>> 1. Every worker writes its output to a directory under the >>>>>> destination directory, something like '$dest/_temporary/$appAtt >>>>>> emptId/_temporary/$taskAttemptID' >>>>>> 2. it is the spark driver which "commits" the job by moving the >>>>>> output from the individual workers from the temporary directories into >>>>>> $dest, then deleting $dest/_temporary >>>>>> 3. For which it needs to be able to list all the output in >>>>>> $dest/_temporary >>>>>> >>>>>> In your case, only the output on the same node of the driver is being >>>>>> committed, because only those files can be listed and moved. The output >>>>>> on >>>>>> the other nodes isn't seen, so isn't committed, nor cleaned up. >>>>>> >>>>>> >>>>>> >>>>>> Now in my own world, if I see, the following things are happening, >>>>>> something is going wrong (with me): >>>>>> 1. SPARK transfers files from different systems to process, instead >>>>>> of processing them locally (I do not have code to prove this, and >>>>>> therefore >>>>>> its just an assumption) >>>>>> 2. SPARK cannot determine when the writes are failing in standalone >>>>>> clusters workers and reports success (code is there for this) >>>>>> 3. SPARK reports back number of records in the worker running in the >>>>>> master node when count() is given without reporting an error while using >>>>>> file:/// and reports an error when I mention the path without >>>>>> file:/// (for SPARK 2.1.x onwards, code is there for this) >>>>>> >>>>>> >>>>>> >>>>>> s everyone's been saying, file:// requires a shared filestore, with >>>>>> uniform paths everywhere. That's needed to list the files to process, >>>>>> read >>>>>> the files in the workers and commit the final output. NFS cross-mounting >>>>>> is >>>>>> the simplest way to do this, especially as for three nodes HDFS is >>>>>> overkill: more services to keep running, no real fault tolerance. Export >>>>>> a >>>>>> directory tree from one of the servers, give the rest access to it, don't >>>>>> worry about bandwidth use as the shared disk itself will become the >>>>>> bottleneck >>>>>> >>>>>> >>>>>> >>>>>> I very sincerely hope with your genuine help the bar of language and >>>>>> social skills will be lowered for me. And everyone will find a way to >>>>>> excuse me and not qualify this email as a means to measure my extremely >>>>>> versatile and amazingly vivid social skills. It will be a lot of help to >>>>>> just focus on the facts related to machines, data, error and (the >>>>>> language >>>>>> that I somehow understand better) code. >>>>>> >>>>>> >>>>>> My sincere apologies once again, as I am 100% sure that I did not >>>>>> meet the required social and language skills. >>>>>> >>>>>> Thanks a ton once again for your kindness, patience and understanding. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Gourav Sengupta >>>>>> >>>>>> >>>>>> * for the curious, the details of the v1 and v2 commit protocols are >>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP- >>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to >>>>>> ols/hadoop-aws/s3a_committer_architecture.md >>>>>> >>>>>> Like I said: you don't want to know the details, and you really don't >>>>>> want to step through Hadoop's FileOutputCommitter to see what's going on. >>>>>> The Spark side is much easier to follow. >>>>>> >>>>>> >>>>> >>>> >>> >> >