Uh believe me there are lots of ppl on this list who will send u code
snippets if u ask... 😀

Yes that is what Steve pointed out, suggesting also that for that simple
exercise you should perform all operations on a spark standalone instead
(or alt. Use an nfs on the cluster)
I'd agree with his suggestion....
I suggest u another alternative:
https://community.cloud.databricks.com/

That's a ready made cluster and you can run your spark app as well store
data on the cluster (well I haven't tried myself but I assume it's
possible).   Try that out... I will try ur script there as I have an
account there (though I guess I'll get there before me.....)

Try that out and let me know if u get stuck....
Kr

On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com> wrote:

> Hi Marco,
>
> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
> someone actually executing code and providing response. It feel wonderful
> that at least someone considered to respond back by executing code and just
> did not filter out each and every technical details to brood only on my
> superb social skills, while claiming the reason for ignoring technical
> details is that it elementary. I think that Steve also is the first person
> who could answer the WHY of an elementary question instead of saying that
> is how it is and pointed out to the correct documentation.
>
> That code works fantastically. But the problem which I have tried to find
> out is while writing out the data and not reading it.
>
>
> So if you see try to read the data from the same folder which has the same
> file across all the nodes then it will work fine. In fact that is what
> should work.
>
> What does not work is that if you try to write back the file and then read
> it once again from the location you have written that is when the issue
> starts happening.
>
> Therefore if in my code you were to save the pandas dataframe as a CSV
> file and then read it then you will find the following observations:
>
> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/
> sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
>
>
> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
> THE DATA DOES NOT EXISTS
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("file:///Users/gouravsengupta/
> Development/spark/sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
>
>
> if you execute my code then also you will surprisingly see that the writes
> in the nodes which is not the master node does not complete moving the
> files from the _temporary folder to the main one.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Hello
>>  please have a look at this. it'sa simple script that just read a
>> dataframe for n time, sleeping at random interval. i used it to test memory
>> issues that another user was experiencing on a spark cluster
>>
>> you should run it like this e.g
>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> <num
>> of iterations>
>>
>> i ran it on the cluster like this
>>
>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>> west-2.compute.amazonaws.com:7077   
>> /root/pyscripts/dataprocessing_Sample-2.py
>> file:///root/pyscripts/tree_addhealth.csv
>>
>> hth, ping me back if you have issues
>> i do agree with Steve's comments.... if you want to test your  spark
>> script s just for playing, do it on  a standaone server on your localhost.
>> Moving to a c luster is just a matter of deploying your script and mke sure
>> you have a common place where to read and store the data..... SysAdmin
>> should give you this when they setup the cluster...
>>
>> kr
>>
>>
>>
>>
>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> I am sincerely obliged for your kind time and response. Can you please
>>> try the solution that you have so kindly suggested?
>>>
>>> It will be a lot of help if you could kindly execute the code that I
>>> have given. I dont think that anyone has yet.
>>>
>>> There are lots of fine responses to my question here, but if you read
>>> the last response from Simon, it comes the closest to being satisfactory. I
>>> am sure even he did not execute the code, but at least he came quite close
>>> to understanding what the problem is.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistr...@gmail.com>
>>> wrote:
>>>
>>>> Hello
>>>>  my 2 cents here, hope it helps
>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>> it's an unnecessary dependency that you dont need for just running a python
>>>> script
>>>> Instead do the following:
>>>> - got to the root of our master / slave node. create a directory
>>>> /root/pyscripts
>>>> - place your csv file there as well as the python script
>>>> - run the script to replicate the whole directory  across the cluster
>>>> (i believe it's called copy-script.sh)
>>>> - then run your spark-submit , it will be something lke
>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>> - in your python script, as part of your processing, write the parquet
>>>> file in directory /root/pyscripts
>>>>
>>>> If you have an AWS account and you are versatile with that - you need
>>>> to setup bucket permissions etc - , you can just
>>>> - store your file in one of your S3 bucket
>>>> - create an EMR cluster
>>>> - connect to master or slave
>>>> - run your  scritp that reads from the s3 bucket and write to the same
>>>> s3 bucket
>>>>
>>>>
>>>> Feel free to mail me privately, i have a working script i have used to
>>>> test some code on spark standalone cluster
>>>> hth
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi Steve,
>>>>>
>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>
>>>>> I am now going through the documentation (
>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much much
>>>>> more sense now.
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>> ste...@hortonworks.com> wrote:
>>>>>
>>>>>>
>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Steve,
>>>>>>
>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>>> does sound impolite in my emails, in advance.
>>>>>>
>>>>>> Let me first start by thanking you.
>>>>>>
>>>>>> I know it looks like I formed all my opinion based on that document,
>>>>>> but that is not the case at all. If you or anyone tries to execute the 
>>>>>> code
>>>>>> that I have given then they will see what I mean. Code speaks louder and
>>>>>> better than words for me.
>>>>>>
>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>> someone will be able to correct  a set of understanding that a moron like
>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>
>>>>>>
>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>>>>> with replication 2 and there is a HADOOP cluster of three nodes. All 
>>>>>> these
>>>>>> nodes have SPARK workers (executors) running in them.  Both are stored in
>>>>>> the following way:
>>>>>> -----------------------------------------------------
>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>> | (master)     |                     |                    |
>>>>>> -----------------------------------------------------
>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>> -----------------------------------------------------
>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>> -----------------------------------------------------
>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>> -----------------------------------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>> HDFS replication does not store the same file in all the nodes in the
>>>>>> cluster. So if I have three nodes and the replication is two then the 
>>>>>> same
>>>>>> file will be stored physically in two nodes in the cluster. Does that 
>>>>>> sound
>>>>>> right?
>>>>>>
>>>>>>
>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is
>>>>>> > 128 then it will be broken up into blocks
>>>>>>
>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>
>>>>>> and each block will be replicated. With replication = 2 there will be
>>>>>> two copies of each block, but the file itself can span > 2 hosts.
>>>>>>
>>>>>>
>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>> If SPARK is trying to process to the records then I am expecting that
>>>>>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>>>>>> not be processing file2.csv and WORKER3 should not be processing 
>>>>>> file3.csv.
>>>>>> Because in case WORKER2 was trying to process file1.csv then it will
>>>>>> actually causing network transmission of the file unnecessarily.
>>>>>>
>>>>>>
>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>> traffic, but it schedules for execution time over waiting for workers 
>>>>>> free
>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is 
>>>>>> only
>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>
>>>>>> There's details on whether/how work across blocks takes place which
>>>>>> I'm avoiding. For now know those formats which are "splittable" will have
>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>>> compress with snappy, it will be split. This gives you maximum 
>>>>>> performance
>>>>>> as >1 thread can work on different blocks. That is, if file1 is split 
>>>>>> into
>>>>>> three blocks, three worker threads can process it.
>>>>>>
>>>>>>
>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>> CLARIFY THIS):
>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>>> just ask the workers to process the files which are avialable in the 
>>>>>> worker
>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>>> file2.csv will not be processed at all.
>>>>>>
>>>>>>
>>>>>> locality is best-effort, not guaranteed.
>>>>>>
>>>>>>
>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD
>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I 
>>>>>> did
>>>>>> not take more than 13 mins to set up the cluster and run the code).
>>>>>>
>>>>>> Once you execute the code then you will find that:
>>>>>> 1.  if the path starts with file:/// while reading back then there
>>>>>> is no error reported, but the number of records reported back are only
>>>>>> those records in the worker which also has the server.
>>>>>> 2. also you will notice that once you cache the file before writing
>>>>>> the partitions are ditributed nicely across the workers, and while 
>>>>>> writing
>>>>>> back, the dataframe partitions does write properly to the worker node in
>>>>>> the Master, but the workers in the other system have the files written in
>>>>>> _temporary folder which does not get copied back to the main folder.
>>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>>
>>>>>>
>>>>>> This gets into the "commit protocol". You don't want to know all the
>>>>>> dirty details (*) but essentially its this
>>>>>>
>>>>>> 1. Every worker writes its output to a directory under the
>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>> output from the individual workers from the temporary directories into
>>>>>> $dest, then deleting $dest/_temporary
>>>>>> 3. For which it needs to be able to list all the output in
>>>>>> $dest/_temporary
>>>>>>
>>>>>> In your case, only the output on the same node of the driver is being
>>>>>> committed, because only those files can be listed and moved. The output 
>>>>>> on
>>>>>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>> something is going wrong (with me):
>>>>>> 1. SPARK transfers files from different systems to process, instead
>>>>>> of processing them locally (I do not have code to prove this, and 
>>>>>> therefore
>>>>>> its just an assumption)
>>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>>> clusters workers and reports success (code is there for this)
>>>>>> 3. SPARK reports back number of records in the worker running in the
>>>>>> master node when count() is given without reporting an error while using
>>>>>> file:/// and reports an error when I mention the path without
>>>>>> file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>
>>>>>>
>>>>>>
>>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>>> uniform paths everywhere. That's needed to list the files to process, 
>>>>>> read
>>>>>> the files in the workers and commit the final output. NFS cross-mounting 
>>>>>> is
>>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>>> overkill: more services to keep running, no real fault tolerance. Export 
>>>>>> a
>>>>>> directory tree from one of the servers, give the rest access to it, don't
>>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>>> bottleneck
>>>>>>
>>>>>>
>>>>>>
>>>>>> I very sincerely hope with your genuine help the bar of language and
>>>>>> social skills will be lowered for me. And everyone will find a way to
>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>>> just focus on the facts related to machines, data, error and (the 
>>>>>> language
>>>>>> that I somehow understand better) code.
>>>>>>
>>>>>>
>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>> meet the required social and language skills.
>>>>>>
>>>>>> Thanks a ton once again for your kindness, patience and understanding.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>>
>>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>
>>>>>> Like I said: you don't want to know the details, and you really don't
>>>>>> want to step through Hadoop's FileOutputCommitter to see what's going on.
>>>>>> The Spark side is much easier to follow.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to